Skip to content

Commit 2a50c3b

Browse files
authored
introduce new VDisk full sync scheme (SST writer part) (#30365)
1 parent 2a35767 commit 2a50c3b

22 files changed

+524
-107
lines changed

ydb/core/base/blobstorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,8 @@ struct TEvBlobStorage {
775775
EvPhantomFlagStorageGetSnapshot,
776776
EvPhantomFlagStorageGetSnapshotResult,
777777
EvSyncLogUpdateNeighbourSyncedLsn,
778+
EvLocalSyncFinished,
779+
EvFullSyncFinished,
778780

779781
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
780782
EvLogResult,

ydb/core/blobstorage/vdisk/defs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
// define to turn on latency optimization
66
//#define UNPACK_LOCALSYNCDATA
77

8-
8+
//#define USE_NEW_FULL_SYNC_SCHEME
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
#include "defs.h"
4+
5+
namespace NKikimr {
6+
7+
#pragma pack(push, 4)
8+
template <class TKey, class TMemRec>
9+
struct TIndexRecord {
10+
private:
11+
TKey Key;
12+
TMemRec MemRec;
13+
14+
public:
15+
TIndexRecord() = default;
16+
17+
TIndexRecord(const TKey& key)
18+
: Key(key)
19+
, MemRec()
20+
{}
21+
22+
TIndexRecord(const TKey& key, const TMemRec& memRec)
23+
: Key(key)
24+
, MemRec(memRec)
25+
{}
26+
27+
TKey GetKey() const {
28+
return ReadUnaligned<TKey>(&Key);
29+
}
30+
31+
TMemRec GetMemRec() const {
32+
return ReadUnaligned<TMemRec>(&MemRec);
33+
}
34+
35+
bool operator <(const TKey& key) const {
36+
return GetKey() < key;
37+
}
38+
39+
bool operator <(const TIndexRecord& rec) const {
40+
return GetKey() < rec.GetKey();
41+
}
42+
43+
bool operator ==(const TIndexRecord& rec) const {
44+
return GetKey() == rec.GetKey();
45+
}
46+
47+
using TVec = TVector<TIndexRecord>;
48+
};
49+
#pragma pack(pop)
50+
51+
} // NKikimr

ydb/core/blobstorage/vdisk/hulldb/base/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ SRCS(
2020
hullbase_barrier.h
2121
hullbase_block.h
2222
hullbase_logoblob.h
23+
hullbase_rec.h
2324
hullds_arena.h
2425
hullds_generic_it.h
2526
hullds_heap_it.h

ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_appendix.h

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_logoblob.h>
66
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h>
77
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h>
8+
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_rec.h>
89
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullds_generic_it.h>
910
// FIXME
1011
#include <ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h>
@@ -20,39 +21,8 @@ namespace NKikimr {
2021
template <class TKey, class TMemRec>
2122
class TFreshAppendix {
2223
public:
23-
struct TRecord {
24-
private:
25-
TKey Key;
26-
TMemRec MemRec;
27-
28-
public:
29-
TKey GetKey() const {
30-
return ReadUnaligned<TKey>(&Key);
31-
}
32-
33-
TMemRec GetMemRec() const {
34-
return ReadUnaligned<TMemRec>(&MemRec);
35-
}
36-
37-
bool operator <(const TKey &key) const {
38-
return GetKey() < key;
39-
}
40-
41-
bool operator <(const TRecord &rec) const {
42-
return GetKey() < rec.GetKey();
43-
}
44-
45-
bool operator ==(const TRecord &rec) const {
46-
return GetKey() == rec.GetKey();
47-
}
48-
49-
TRecord(const TKey &key, const TMemRec &memRec)
50-
: Key(key)
51-
, MemRec(memRec)
52-
{}
53-
};
54-
55-
using TVec = TVector<TRecord>;
24+
using TRecord = TIndexRecord<TKey, TMemRec>;
25+
using TVec = TRecord::TVec;
5626

5727
TFreshAppendix(const TMemoryConsumer &memConsumer)
5828
: MemConsumed(memConsumer)
@@ -93,6 +63,10 @@ namespace NKikimr {
9363
return sizeof(TRecord) * SortedRecs.size();
9464
}
9565

66+
TVec Extract() {
67+
return std::move(SortedRecs);
68+
}
69+
9670
::NMonitoring::TDynamicCounters::TCounterPtr GetCounter() const {
9771
return MemConsumed.GetCounter();
9872
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#pragma once
2+
3+
#include "blobstorage_hullwritesst.h"
4+
5+
namespace NKikimr {
6+
7+
template <class TKey, class TMemRec>
8+
class TIndexSstWriter {
9+
using TRec = TIndexRecord<TKey, TMemRec>;
10+
using TLevelSegment = TLevelSegment<TKey, TMemRec>;
11+
12+
TVDiskContextPtr VCtx;
13+
TPDiskCtxPtr PDiskCtx;
14+
TIntrusivePtr<TLevelIndex<TKey, TMemRec>> LevelIndex;
15+
TQueue<std::unique_ptr<NPDisk::TEvChunkWrite>>& MsgQueue;
16+
17+
std::unique_ptr<TBufferedChunkWriter> Writer;
18+
ui32 Items = 0;
19+
ui32 ChunkIdx = 0;
20+
ui64 SstId = 0;
21+
TIndexRecord<TKey, TMemRec>::TVec PostponedRecs;
22+
23+
TTrackableVector<TRec> Recs;
24+
TIntrusivePtr<TLevelSegment> LevelSegment;
25+
26+
void PutPlaceHolder() {
27+
auto& info = LevelSegment->Info;
28+
info.FirstLsn = 0;
29+
info.LastLsn = 0;
30+
info.InplaceDataTotalSize = 0;
31+
info.HugeDataTotalSize = 0;
32+
info.IdxTotalSize = sizeof(TRec) * Items;
33+
info.Chunks = 1;
34+
info.IndexParts = 1;
35+
info.Items = Items;
36+
info.ItemsWithInplacedData = 0;
37+
info.ItemsWithHugeData = 0;
38+
info.OutboundItems = 0;
39+
info.CTime = TAppData::TimeProvider->Now();
40+
41+
if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) {
42+
LevelSegment->LoadLinearIndex(Recs);
43+
Recs.clear();
44+
} else {
45+
Recs.shrink_to_fit();
46+
LevelSegment->LoadedIndex = std::move(Recs);
47+
}
48+
49+
TVector<ui32> usedChunks;
50+
usedChunks.push_back(ChunkIdx);
51+
LevelSegment->AllChunks = std::move(usedChunks);
52+
53+
LevelSegment->AssignedSstId = SstId;
54+
55+
auto ratio = MakeIntrusive<NHullComp::TSstRatio>();
56+
ratio->IndexItemsTotal = ratio->IndexItemsKeep = info.Items;
57+
ratio->IndexBytesTotal = ratio->IndexBytesKeep = info.IdxTotalSize;
58+
ratio->InplacedDataTotal = ratio->InplacedDataKeep = 0;
59+
ratio->HugeDataTotal = ratio->HugeDataKeep = 0;
60+
LevelSegment->StorageRatio.Set(ratio);
61+
62+
TIdxDiskPlaceHolder placeHolder(SstId);
63+
placeHolder.Info = info;
64+
placeHolder.PrevPart = {};
65+
66+
LevelSegment->LastPartAddr = TDiskPart(ChunkIdx, 0, info.IdxTotalSize + sizeof(TIdxDiskPlaceHolder));
67+
LevelSegment->IndexParts.push_back(LevelSegment->LastPartAddr);
68+
Writer->Push(&placeHolder, sizeof(placeHolder));
69+
}
70+
71+
public:
72+
TIndexSstWriter(
73+
TVDiskContextPtr vCtx,
74+
TPDiskCtxPtr pDiskCtx,
75+
TIntrusivePtr<TLevelIndex<TKey, TMemRec>> levelIndex,
76+
TQueue<std::unique_ptr<NPDisk::TEvChunkWrite>>& msgQueue)
77+
: VCtx(std::move(vCtx))
78+
, PDiskCtx(std::move(pDiskCtx))
79+
, LevelIndex(levelIndex)
80+
, MsgQueue(msgQueue)
81+
, Recs(TMemoryConsumer(VCtx->SstIndex))
82+
{}
83+
84+
bool Push(const TIndexRecord<TKey, TMemRec>::TVec& records) {
85+
if (!Writer) {
86+
PostponedRecs.insert(PostponedRecs.end(), records.begin(), records.end());
87+
return false;
88+
}
89+
90+
auto freeSpace = Writer->GetFreeSpace();
91+
auto recsSize = sizeof(TRec) * records.size();
92+
93+
if (recsSize + sizeof(TIdxDiskPlaceHolder) <= freeSpace) {
94+
Recs.insert(Recs.end(), records.begin(), records.end());
95+
Writer->Push(records.begin(), recsSize);
96+
Items += records.size();
97+
return true;
98+
}
99+
100+
ui32 recsFit = (freeSpace - sizeof(TIdxDiskPlaceHolder)) / sizeof(TRec);
101+
Writer->Push(records.begin(), sizeof(TRec) * recsFit);
102+
Items += recsFit;
103+
auto it = records.begin() + recsFit;
104+
Recs.insert(Recs.end(), records.begin(), it);
105+
PostponedRecs.insert(PostponedRecs.end(), it, records.end());
106+
107+
FinishChunk();
108+
return false;
109+
}
110+
111+
void FinishChunk() {
112+
PutPlaceHolder();
113+
Writer->FinishChunk();
114+
Writer.reset();
115+
}
116+
117+
TIntrusivePtr<TLevelSegment> GetSegment() {
118+
return std::move(LevelSegment);
119+
}
120+
121+
void OnChunkReserved(ui32 chunkIdx) {
122+
Writer = std::make_unique<TBufferedChunkWriter>(
123+
TMemoryConsumer(VCtx->SstIndex),
124+
PDiskCtx->Dsk->Owner,
125+
PDiskCtx->Dsk->OwnerRound,
126+
NPriWrite::SyncLog, // ???
127+
(ui32)PDiskCtx->Dsk->ChunkSize,
128+
PDiskCtx->Dsk->AppendBlockSize,
129+
(ui32)PDiskCtx->Dsk->BulkWriteBlockSize,
130+
chunkIdx,
131+
MsgQueue);
132+
133+
Recs.reserve((PDiskCtx->Dsk->ChunkSize - sizeof(TIdxDiskPlaceHolder)) / sizeof(TRec));
134+
135+
LevelSegment = MakeIntrusive<TLevelSegment>(VCtx);
136+
137+
ChunkIdx = chunkIdx;
138+
SstId = LevelIndex->AllocSstId();
139+
140+
// TEvLocalSyncData buffers must be always less than chunk size
141+
bool ok = Push(PostponedRecs);
142+
Y_VERIFY_S(ok, VCtx->VDiskLogPrefix);
143+
PostponedRecs.clear();
144+
}
145+
};
146+
147+
} // NKikimr

ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace NKikimr {
3636
}
3737
}
3838

39-
class TBufferedChunkWriter : public TThrRefBase {
39+
class TBufferedChunkWriter {
4040
public:
4141
TBufferedChunkWriter(TMemoryConsumer&& consumer, ui8 owner, ui64 ownerRound, ui8 priority, ui32 chunkSize,
4242
ui32 appendBlockSize, ui32 writeBlockSize, ui32 chunkIdx,
@@ -412,7 +412,7 @@ namespace NKikimr {
412412
static_assert((SuffixSize >> 2 << 2) == SuffixSize, "expect (SuffixSize >> 2 << 2) == SuffixSize");
413413
static_assert(sizeof(TIdxDiskLinker) <= sizeof(TIdxDiskPlaceHolder), "expect sizeof(TIdxDiskLinker) <= sizeof(TIdxDiskPlaceHolder)");
414414

415-
typedef TRecIndexBase<TKey, TMemRec>::TRec TRec;
415+
typedef TIndexRecord<TKey, TMemRec> TRec;
416416

417417
public:
418418
TIndexBuilder(TVDiskContextPtr vctx, EWriterDataType type, ui8 owner, ui64 ownerRound, ui32 chunkSize,

0 commit comments

Comments
 (0)