Skip to content

Commit 977caf6

Browse files
authored
use source index in kqp cursor (#29789)
1 parent 3d17fe9 commit 977caf6

File tree

26 files changed

+184
-204
lines changed

26 files changed

+184
-204
lines changed

ydb/core/protos/kqp.proto

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -758,18 +758,19 @@ message TEvKqpScanCursor {
758758
message TColumnShardScanPlain {
759759
}
760760
message TColumnShardScanSimple {
761-
optional uint64 SourceId = 1;
761+
optional uint64 SourceIdx = 1;
762762
optional uint32 StartRecordIndex = 2;
763763
}
764764
message TColumnShardScanNotSortedSimple {
765-
optional uint64 SourceId = 1;
765+
optional uint64 SourceIdx = 1;
766766
optional uint32 StartRecordIndex = 2;
767767
}
768768
oneof Implementation {
769769
TColumnShardScanPlain ColumnShardPlain = 10;
770-
TColumnShardScanSimple ColumnShardSimple = 11;
771-
TColumnShardScanNotSortedSimple ColumnShardNotSortedSimple = 12;
770+
TColumnShardScanSimple ColumnShardSimple = 13;
771+
TColumnShardScanNotSortedSimple ColumnShardNotSortedSimple = 14;
772772
}
773+
reserved 11, 12; // deprecated
773774
}
774775

775776
message TEvRemoteScanData {

ydb/core/tx/columnshard/engines/predicate/filter.h

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class IScanCursor {
155155

156156
virtual const std::shared_ptr<NArrow::TSimpleRow>& DoGetPKCursor() const = 0;
157157
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const = 0;
158-
virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const = 0;
158+
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const = 0;
159159
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) = 0;
160160
virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const = 0;
161161

@@ -168,9 +168,9 @@ class IScanCursor {
168168
return DoGetPKCursor();
169169
}
170170

171-
bool CheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const {
171+
bool CheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const {
172172
AFL_VERIFY(IsInitialized());
173-
return DoCheckSourceIntervalUsage(sourceId, indexStart, recordsCount);
173+
return DoCheckSourceIntervalUsage(sourceIdx, indexStart, recordsCount);
174174
}
175175

176176
bool CheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const {
@@ -198,11 +198,12 @@ class IScanCursor {
198198
class TSimpleScanCursor: public IScanCursor {
199199
private:
200200
YDB_READONLY_DEF(std::shared_ptr<NArrow::TSimpleRow>, PrimaryKey);
201-
YDB_READONLY(ui64, SourceId, 0);
201+
std::optional<ui32> SourceIdx;
202202
YDB_READONLY(ui32, RecordIndex, 0);
203203

204204
virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
205-
proto.MutableColumnShardSimple()->SetSourceId(SourceId);
205+
AFL_VERIFY(SourceIdx);
206+
proto.MutableColumnShardSimple()->SetSourceIdx(*SourceIdx);
206207
proto.MutableColumnShardSimple()->SetStartRecordIndex(RecordIndex);
207208
}
208209

@@ -211,11 +212,12 @@ class TSimpleScanCursor: public IScanCursor {
211212
}
212213

213214
virtual bool IsInitialized() const override {
214-
return !!SourceId;
215+
return !!SourceIdx;
215216
}
216217

217218
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const override {
218-
if (SourceId != entity.GetEntityId()) {
219+
AFL_VERIFY(SourceIdx);
220+
if (*SourceIdx != entity.GetEntityId()) {
219221
return false;
220222
}
221223
if (!entity.GetEntityRecordsCount()) {
@@ -231,19 +233,20 @@ class TSimpleScanCursor: public IScanCursor {
231233
if (!proto.HasColumnShardSimple()) {
232234
return TConclusionStatus::Fail("absent sorted cursor data");
233235
}
234-
if (!proto.GetColumnShardSimple().HasSourceId()) {
236+
if (!proto.GetColumnShardSimple().HasSourceIdx()) {
235237
return TConclusionStatus::Fail("incorrect source id for cursor initialization");
236238
}
237-
SourceId = proto.GetColumnShardSimple().GetSourceId();
239+
SourceIdx = proto.GetColumnShardSimple().GetSourceIdx();
238240
if (!proto.GetColumnShardSimple().HasStartRecordIndex()) {
239241
return TConclusionStatus::Fail("incorrect record index for cursor initialization");
240242
}
241243
RecordIndex = proto.GetColumnShardSimple().GetStartRecordIndex();
242244
return TConclusionStatus::Success();
243245
}
244246

245-
virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const override {
246-
AFL_VERIFY(sourceId == SourceId);
247+
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const override {
248+
AFL_VERIFY(SourceIdx);
249+
AFL_VERIFY(sourceIdx == *SourceIdx);
247250
if (indexStart >= RecordIndex) {
248251
return true;
249252
}
@@ -254,22 +257,23 @@ class TSimpleScanCursor: public IScanCursor {
254257
public:
255258
TSimpleScanCursor() = default;
256259

257-
TSimpleScanCursor(const std::shared_ptr<NArrow::TSimpleRow>& pk, const ui64 portionId, const ui32 recordIndex)
260+
TSimpleScanCursor(const std::shared_ptr<NArrow::TSimpleRow>& pk, const ui32 sourceIdx, const ui32 recordIndex)
258261
: PrimaryKey(pk)
259-
, SourceId(portionId)
262+
, SourceIdx(sourceIdx)
260263
, RecordIndex(recordIndex)
261264
{
262265
}
263266
};
264267

265268
class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
266269
private:
267-
YDB_READONLY(ui64, SourceId, 0);
270+
std::optional<ui32> SourceIdx;
268271
YDB_READONLY(ui32, RecordIndex, 0);
269272

270273
virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
271274
auto& data = *proto.MutableColumnShardNotSortedSimple();
272-
data.SetSourceId(SourceId);
275+
AFL_VERIFY(SourceIdx);
276+
data.SetSourceIdx(*SourceIdx);
273277
data.SetStartRecordIndex(RecordIndex);
274278
}
275279

@@ -278,11 +282,12 @@ class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
278282
}
279283

280284
virtual bool IsInitialized() const override {
281-
return !!SourceId;
285+
return !!SourceIdx;
282286
}
283287

284288
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const override {
285-
if (SourceId != entity.GetEntityId()) {
289+
AFL_VERIFY(SourceIdx);
290+
if (*SourceIdx != entity.GetEntityId()) {
286291
return false;
287292
}
288293
if (!entity.GetEntityRecordsCount()) {
@@ -299,19 +304,20 @@ class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
299304
return TConclusionStatus::Fail("absent unsorted cursor data");
300305
}
301306
auto& data = proto.GetColumnShardNotSortedSimple();
302-
if (!data.HasSourceId()) {
303-
return TConclusionStatus::Fail("incorrect source id for cursor initialization");
307+
if (!data.HasSourceIdx()) {
308+
return TConclusionStatus::Fail("incorrect source index for cursor initialization");
304309
}
305-
SourceId = data.GetSourceId();
310+
SourceIdx = data.GetSourceIdx();
306311
if (!data.HasStartRecordIndex()) {
307312
return TConclusionStatus::Fail("incorrect record index for cursor initialization");
308313
}
309314
RecordIndex = data.GetStartRecordIndex();
310315
return TConclusionStatus::Success();
311316
}
312317

313-
virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const override {
314-
AFL_VERIFY(sourceId == SourceId);
318+
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const override {
319+
AFL_VERIFY(SourceIdx);
320+
AFL_VERIFY(sourceIdx == *SourceIdx);
315321
if (indexStart >= RecordIndex) {
316322
return true;
317323
}
@@ -322,8 +328,8 @@ class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
322328
public:
323329
TNotSortedSimpleScanCursor() = default;
324330

325-
TNotSortedSimpleScanCursor(const ui64 portionId, const ui32 recordIndex)
326-
: SourceId(portionId)
331+
TNotSortedSimpleScanCursor(const ui32 sourceIdx, const ui32 recordIndex)
332+
: SourceIdx(sourceIdx)
327333
, RecordIndex(recordIndex)
328334
{
329335
}
@@ -355,7 +361,7 @@ class TPlainScanCursor: public IScanCursor {
355361
return true;
356362
}
357363

358-
virtual bool DoCheckSourceIntervalUsage(const ui64 /*sourceId*/, const ui32 /*indexStart*/, const ui32 /*recordsCount*/) const override {
364+
virtual bool DoCheckSourceIntervalUsage(const ui32 /*sourceIdx*/, const ui32 /*indexStart*/, const ui32 /*recordsCount*/) const override {
359365
return true;
360366
}
361367

ydb/core/tx/columnshard/engines/reader/common/comparable.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ class TReplaceKeyAdapter {
2424
const NArrow::TSimpleRow& GetValue() const {
2525
return Value;
2626
}
27-
2827
NArrow::TSimpleRow CopyValue() const {
2928
return Value;
3029
}
30+
NArrow::TSimpleRow&& ExtractValue() && {
31+
return std::move(Value);
32+
}
3133

3234
explicit TReplaceKeyAdapter(NArrow::TSimpleRow&& rk, const bool reverse)
3335
: Reverse(reverse)

ydb/core/tx/columnshard/engines/reader/common_reader/common/accessors_ordering.h

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,88 @@
11
#pragma once
22
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
3+
#include <ydb/core/tx/columnshard/engines/reader/common/comparable.h>
34
#include <ydb/core/tx/columnshard/engines/reader/common/description.h>
45
#include <ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h>
56
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h>
67

78
namespace NKikimr::NOlap::NReader::NCommon {
8-
template <class TObject>
9+
10+
class TDataSourceConstructor: public ICursorEntity, public TMoveOnly {
11+
private:
12+
ui32 SourceId = 0;
13+
TReplaceKeyAdapter Start;
14+
TReplaceKeyAdapter Finish;
15+
ui32 SourceIdx = 0;
16+
bool SourceIdxInitialized = false;
17+
18+
virtual ui64 DoGetEntityId() const override {
19+
return GetSourceIdx();
20+
}
21+
22+
public:
23+
ui32 GetSourceId() const {
24+
return SourceId;
25+
}
26+
27+
void SetIndex(const ui32 index) {
28+
AFL_VERIFY(!SourceIdxInitialized);
29+
SourceIdxInitialized = true;
30+
SourceIdx = index;
31+
}
32+
33+
ui32 GetSourceIdx() const {
34+
AFL_VERIFY(SourceIdxInitialized);
35+
return SourceIdx;
36+
}
37+
38+
TReplaceKeyAdapter ExtractStart() {
39+
return std::move(Start);
40+
}
41+
42+
TReplaceKeyAdapter ExtractFinish() {
43+
return std::move(Finish);
44+
}
45+
46+
TDataSourceConstructor(const ui32 sourceId, TReplaceKeyAdapter&& start, TReplaceKeyAdapter&& finish)
47+
: SourceId(sourceId)
48+
, Start(std::move(start))
49+
, Finish(std::move(finish))
50+
{
51+
AFL_VERIFY(SourceId);
52+
}
53+
54+
const TReplaceKeyAdapter& GetStart() const {
55+
return Start;
56+
}
57+
const TReplaceKeyAdapter& GetFinish() const {
58+
return Finish;
59+
}
60+
61+
class TComparator {
62+
private:
63+
const ERequestSorting Sorting;
64+
65+
public:
66+
TComparator(const ERequestSorting sorting)
67+
: Sorting(sorting)
68+
{
69+
AFL_VERIFY(Sorting != ERequestSorting::NONE);
70+
}
71+
72+
bool operator()(const TDataSourceConstructor& l, const TDataSourceConstructor& r) const {
73+
return std::make_pair(r.Start, r.SourceId) < std::make_pair(l.Start, l.SourceId);
74+
}
75+
};
76+
};
77+
78+
template <std::derived_from<TDataSourceConstructor> TObject>
979
class TOrderedObjects {
1080
private:
1181
const ERequestSorting Sorting;
1282
std::deque<TObject> HeapObjects;
1383
YDB_READONLY_DEF(std::deque<TObject>, AlreadySorted);
1484
bool Initialized = false;
85+
ui32 NextObjectIdx = 0;
1586

1687
public:
1788
TOrderedObjects(const ERequestSorting sorting)
@@ -32,10 +103,10 @@ class TOrderedObjects {
32103

33104
TObject& MutableNextObject() {
34105
AFL_VERIFY(GetSize());
35-
if (AlreadySorted.size()) {
36-
return AlreadySorted.front();
106+
if (AlreadySorted.empty()) {
107+
PrepareOrdered(1);
37108
}
38-
return HeapObjects.front();
109+
return AlreadySorted.front();
39110
}
40111

41112
void Initialize(std::deque<TObject>&& objects) {
@@ -46,30 +117,32 @@ class TOrderedObjects {
46117
std::make_heap(HeapObjects.begin(), HeapObjects.end(), typename TObject::TComparator(Sorting));
47118
} else {
48119
AlreadySorted = std::move(objects);
120+
for (auto& source : AlreadySorted) {
121+
source.SetIndex(NextObjectIdx++);
122+
}
49123
}
50124
}
51125

52126
void PrepareOrdered(const ui32 count) {
53127
if (Sorting != ERequestSorting::NONE) {
54128
while (AlreadySorted.size() < count && HeapObjects.size()) {
55129
std::pop_heap(HeapObjects.begin(), HeapObjects.end(), typename TObject::TComparator(Sorting));
130+
HeapObjects.back().SetIndex(NextObjectIdx++);
56131
AlreadySorted.emplace_back(std::move(HeapObjects.back()));
57132
HeapObjects.pop_back();
58133
}
134+
} else {
135+
AFL_VERIFY(HeapObjects.empty());
59136
}
60137
}
61138

62139
TObject PopFront() {
63-
if (AlreadySorted.size()) {
64-
AFL_VERIFY(AlreadySorted.size());
65-
auto result = std::move(AlreadySorted.front());
66-
AlreadySorted.pop_front();
67-
return result;
140+
if (AlreadySorted.empty()) {
141+
PrepareOrdered(1);
68142
}
69-
AFL_VERIFY(HeapObjects.size());
70-
std::pop_heap(HeapObjects.begin(), HeapObjects.end(), typename TObject::TComparator(Sorting));
71-
auto result = std::move(HeapObjects.back());
72-
HeapObjects.pop_back();
143+
AFL_VERIFY(AlreadySorted.size());
144+
auto result = std::move(AlreadySorted.front());
145+
AlreadySorted.pop_front();
73146
return result;
74147
}
75148

@@ -144,7 +217,7 @@ class TSourcesConstructorWithAccessorsImpl: public ISourcesConstructor {
144217
}
145218
};
146219

147-
template <class TConstructor>
220+
template <std::derived_from<TDataSourceConstructor> TConstructor>
148221
class TSourcesConstructorWithAccessors: public TSourcesConstructorWithAccessorsImpl {
149222
private:
150223
TOrderedObjects<TConstructor> Constructors;

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,6 @@ class TReadMetadata: public TReadMetadataBase {
122122
public:
123123
using TConstPtr = std::shared_ptr<const TReadMetadata>;
124124

125-
void SetSelectInfo(std::unique_ptr<ISourcesConstructor>&& value) {
126-
AFL_VERIFY(!SourcesConstructor);
127-
SourcesConstructor = std::move(value);
128-
}
129-
130125
std::unique_ptr<ISourcesConstructor> ExtractSelectInfo() const {
131126
AFL_VERIFY(!!SourcesConstructor);
132127
return std::move(SourcesConstructor);

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource {
102102
virtual bool DoAddTxConflict() = 0;
103103

104104
virtual ui64 DoGetEntityId() const override {
105-
return SourceId;
105+
return SourceIdx;
106106
}
107107

108108
virtual ui64 DoGetEntityRecordsCount() const override;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ std::vector<TInsertWriteId> TPortionsSources::GetUncommittedWriteIds() const {
3535

3636
std::shared_ptr<TPortionDataSource> TSourceConstructor::Construct(
3737
const std::shared_ptr<NCommon::TSpecialReadContext>& context, std::shared_ptr<TPortionDataAccessor>&& accessor) const {
38-
AFL_VERIFY(SourceIdx);
39-
auto result = std::make_shared<TPortionDataSource>(*SourceIdx, Portion, context);
38+
auto result = std::make_shared<TPortionDataSource>(GetSourceIdx(), Portion, context);
4039
result->SetPortionAccessor(std::move(accessor));
4140
if (IsStartedByCursorFlag) {
4241
result->SetIsStartedByCursor();

0 commit comments

Comments
 (0)