Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TDqChannelStorage : public IDqChannelStorage {
NThreading::TFuture<void> IsBlobWrittenFuture_;
};
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
: ActorSystem_(actorSystem)
{
Expand Down Expand Up @@ -91,6 +91,10 @@ class TDqChannelStorage : public IDqChannelStorage {
return true;
}

void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) override {
ActorSystem_->Send(ChannelStorageActorId_, new TEvDqChannelSpilling::TEvSetWakeUpCallback(std::move(wakeUpCallback)));
}

private:
void UpdateWriteStatus() {
for (auto it = WritingBlobs_.begin(); it != WritingBlobs_.end();) {
Expand Down
30 changes: 17 additions & 13 deletions ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s);

#define LOG_T(s) \
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);

class TDqChannelStorageActor : public IDqChannelStorageActor,
public NActors::TActorBootstrapped<TDqChannelStorageActor>
Expand Down Expand Up @@ -98,8 +98,9 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
hFunc(TEvDqSpilling::TEvError, HandleWork);
hFunc(TEvDqChannelSpilling::TEvGet, HandleWork);
hFunc(TEvDqChannelSpilling::TEvPut, HandleWork);
hFunc(TEvDqChannelSpilling::TEvGet, HandleWork);
hFunc(TEvDqChannelSpilling::TEvSetWakeUpCallback, HandleWork);
cFunc(TEvents::TEvPoison::EventType, PassAway);
default:
Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
Expand All @@ -108,30 +109,33 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
}
}

void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) {
auto& msg = *ev->Get();
LOG_T("[TEvPut] blobId: " << msg.BlobId_);

auto opBegin = TInstant::Now();

auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin};
WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
}

void HandleWork(TEvDqChannelSpilling::TEvGet::TPtr& ev) {
auto& msg = *ev->Get();
LOG_T("[TEvGet] blobId: " << msg.BlobId_);

auto opBegin = TInstant::Now();

auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin};
LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_));
}

void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) {
void HandleWork(TEvDqChannelSpilling::TEvSetWakeUpCallback::TPtr& ev) {
auto& msg = *ev->Get();
LOG_T("[TEvPut] blobId: " << msg.BlobId_);

auto opBegin = TInstant::Now();

auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin};
WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
WakeUpCallback_ = std::move(msg.WakeUpCallback_);
}

void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
Expand Down Expand Up @@ -203,7 +207,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,

// BlobId -> blob size + promise that blob is saved
std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;

// BlobId -> promise with requested blob
std::unordered_map<ui64, TLoadingBlobInfo> LoadingBlobs_;

Expand Down
12 changes: 11 additions & 1 deletion ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace NYql::NDq {
struct TDqChannelStorageActorEvents {
enum {
EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30100,
EvGet
EvGet,
EvSetWakeUpCallback
};
};

Expand Down Expand Up @@ -42,6 +43,15 @@ struct TEvDqChannelSpilling {
ui64 BlobId_;
NThreading::TPromise<TBuffer> Promise_;
};

struct TEvSetWakeUpCallback : NActors::TEventLocal<TEvSetWakeUpCallback, TDqChannelStorageActorEvents::EvSetWakeUpCallback> {
TEvSetWakeUpCallback(TWakeUpCallback&& wakeUpCallback)
: WakeUpCallback_(std::move(wakeUpCallback))
{
}

TWakeUpCallback WakeUpCallback_;
};
};

class IDqChannelStorageActor
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/dq/runtime/dq_channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TDqChannelStorageException : public yexception {
class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
public:
using TPtr = TIntrusivePtr<IDqChannelStorage>;
using TWakeUpCallback = std::function<void()>;

public:
virtual ~IDqChannelStorage() = default;
Expand All @@ -30,7 +31,9 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
// It is better to replace Get() with Pull() which will delete blob after read
// (current clients read each blob exactly once)
// Get() will return false if data is not ready yet. Client should repeat Get() in this case
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;

virtual void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) = 0;
};

} // namespace NYql::NDq
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/runtime/ut/ut_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class TMockChannelStorage : public IDqChannelStorage {
return true;
}

void SetWakeUpCallback(TWakeUpCallback&& /* wakeUpCallback */) override {

}

public:
void SetBlankGetRequests(ui32 count) {
GetBlankRequests = count;
Expand Down
Loading