diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index aeae6eb226cf..0c4f463782a0 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -30,7 +30,7 @@ class TDqChannelStorage : public IDqChannelStorage { NThreading::TFuture IsBlobWrittenFuture_; }; public: - TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TIntrusivePtr spillingTaskCounters, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { @@ -91,6 +91,10 @@ class TDqChannelStorage : public IDqChannelStorage { return true; } + void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) override { + ActorSystem_->Send(ChannelStorageActorId_, new TEvDqChannelSpilling::TEvSetWakeUpCallback(std::move(wakeUpCallback))); + } + private: void UpdateWriteStatus() { for (auto it = WritingBlobs_.begin(); it != WritingBlobs_.end();) { diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp index c417bf5d5a7b..609a330f6a32 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -35,7 +35,7 @@ namespace { LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); #define LOG_T(s) \ - LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); + LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); class TDqChannelStorageActor : public IDqChannelStorageActor, public NActors::TActorBootstrapped @@ -98,8 +98,9 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); hFunc(TEvDqSpilling::TEvReadResult, HandleWork); hFunc(TEvDqSpilling::TEvError, HandleWork); - hFunc(TEvDqChannelSpilling::TEvGet, HandleWork); hFunc(TEvDqChannelSpilling::TEvPut, HandleWork); + hFunc(TEvDqChannelSpilling::TEvGet, HandleWork); + hFunc(TEvDqChannelSpilling::TEvSetWakeUpCallback, HandleWork); cFunc(TEvents::TEvPoison::EventType, PassAway); default: Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", @@ -108,30 +109,33 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, } } + void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvPut] blobId: " << msg.BlobId_); + + auto opBegin = TInstant::Now(); + + auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin}; + WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo)); + SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_))); + } void HandleWork(TEvDqChannelSpilling::TEvGet::TPtr& ev) { auto& msg = *ev->Get(); LOG_T("[TEvGet] blobId: " << msg.BlobId_); auto opBegin = TInstant::Now(); - + auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin}; LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo)); SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_)); } - void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) { + void HandleWork(TEvDqChannelSpilling::TEvSetWakeUpCallback::TPtr& ev) { auto& msg = *ev->Get(); - LOG_T("[TEvPut] blobId: " << msg.BlobId_); - - auto opBegin = TInstant::Now(); - - auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.Size(), std::move(msg.Promise_), opBegin}; - WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo)); - - SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_))); + WakeUpCallback_ = std::move(msg.WakeUpCallback_); } void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { @@ -203,7 +207,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, // BlobId -> blob size + promise that blob is saved std::unordered_map WritingBlobs_; - + // BlobId -> promise with requested blob std::unordered_map LoadingBlobs_; diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h index 88722e84e557..2a2ed7a73bdd 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h @@ -14,7 +14,8 @@ namespace NYql::NDq { struct TDqChannelStorageActorEvents { enum { EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30100, - EvGet + EvGet, + EvSetWakeUpCallback }; }; @@ -42,6 +43,15 @@ struct TEvDqChannelSpilling { ui64 BlobId_; NThreading::TPromise Promise_; }; + + struct TEvSetWakeUpCallback : NActors::TEventLocal { + TEvSetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) + : WakeUpCallback_(std::move(wakeUpCallback)) + { + } + + TWakeUpCallback WakeUpCallback_; + }; }; class IDqChannelStorageActor diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h index f94d02a5fad9..09dcc815921e 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_storage.h +++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h @@ -14,6 +14,7 @@ class TDqChannelStorageException : public yexception { class IDqChannelStorage : public TSimpleRefCount { public: using TPtr = TIntrusivePtr; + using TWakeUpCallback = std::function; public: virtual ~IDqChannelStorage() = default; @@ -30,7 +31,9 @@ class IDqChannelStorage : public TSimpleRefCount { // It is better to replace Get() with Pull() which will delete blob after read // (current clients read each blob exactly once) // Get() will return false if data is not ready yet. Client should repeat Get() in this case - virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0; + virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0; + + virtual void SetWakeUpCallback(TWakeUpCallback&& wakeUpCallback) = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/ut/ut_helper.h b/ydb/library/yql/dq/runtime/ut/ut_helper.h index e932b406f2dc..fc4809fe1eb9 100644 --- a/ydb/library/yql/dq/runtime/ut/ut_helper.h +++ b/ydb/library/yql/dq/runtime/ut/ut_helper.h @@ -59,6 +59,10 @@ class TMockChannelStorage : public IDqChannelStorage { return true; } + void SetWakeUpCallback(TWakeUpCallback&& /* wakeUpCallback */) override { + + } + public: void SetBlankGetRequests(ui32 count) { GetBlankRequests = count;