diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index dd9102302163..468772c5a33b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -406,6 +406,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } ProcessOutputsState.LastRunStatus = status; + ProcessOutputsState.LastRunTime = TInstant::Now(); for (auto& entry : OutputChannelsMap) { const ui64 channelId = entry.first; @@ -855,11 +856,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped protected: struct TInputChannelInfo { + ui32 InputIndex; TString LogPrefix; ui64 ChannelId; ui32 SrcStageId; IDqInputChannel::TPtr Channel; bool HasPeer = false; + NActors::TActorId PeerId; const NDqProto::EWatermarksMode WatermarksMode; const TDuration WatermarksIdleTimeout = TDuration::Max(); std::optional PendingCheckpoint; @@ -867,13 +870,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped i64 FreeSpace = 0; explicit TInputChannelInfo( + ui32 inputIndex, const TString& logPrefix, ui64 channelId, ui32 srcStageId, NDqProto::EWatermarksMode watermarksMode, NDqProto::ECheckpointingMode checkpointingMode, TDuration watermarksIdleTimeout) - : LogPrefix(logPrefix) + : InputIndex(inputIndex) + , LogPrefix(logPrefix) , ChannelId(channelId) , SrcStageId(srcStageId) , WatermarksMode(watermarksMode) @@ -919,6 +924,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ui32 DstStageId; IDqOutputChannel::TPtr Channel; bool HasPeer = false; + NActors::TActorId PeerId; bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints. bool EarlyFinish = false; bool PopStarted = false; @@ -1089,6 +1095,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped Channels->SetInputChannelPeer(channelUpdate.GetId(), peer); inputChannel->HasPeer = true; + inputChannel->PeerId = peer; continue; } @@ -1101,6 +1108,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped Channels->SetOutputChannelPeer(channelUpdate.GetId(), peer); outputChannel->HasPeer = true; + outputChannel->PeerId = peer; if (outputChannel->Channel) { outputChannel->Channel->UpdateSettings({.IsLocalChannel = peer.NodeId() == this->SelfId().NodeId()}); } @@ -1276,8 +1284,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped void MonitoringExtra(TStringStream&) { } - void OnMonitoringPage(NActors::NMon::TEvHttpInfo::TPtr& ev) { - TStringStream html; + void DumpForMonitoring(TStringStream& html) { static_cast(this)->MonitoringExtra(html); #define DUMP(P, X,...) html << #X ": " << P.X __VA_ARGS__ << "
" @@ -1533,8 +1540,181 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } #undef DUMP #undef DUMP_PREFIXED + } + + void DefaultMonitoringPage(TStringStream& str) { + HTML(str) { + PRE() { + str << "TDqComputeActorBase, SelfId=" << this->SelfId() << ' '; + HREF(TStringBuilder() << "?ca=" << this->SelfId() << "&view=dump") { + str << "Dump"; + } + str << ' '; + HREF(TStringBuilder() << "?ca=" << this->SelfId() << "&view=run") { + str << "Run"; + } + str << Endl; + + COLLAPSED_BUTTON_CONTENT("ProcessOutputsState", TStringBuilder() << "ProcessOutputsState: " << ProcessOutputsState.LastRunTime << ' ' << ProcessOutputsState.LastRunStatus) { + str << " Inflight: " << ProcessOutputsState.Inflight << Endl; + str << " ChannelsReady: " << ProcessOutputsState.ChannelsReady << Endl; + str << " HasDataToSend: " << ProcessOutputsState.HasDataToSend << Endl; + str << " DataWasSent: " << ProcessOutputsState.DataWasSent << Endl; + str << " AllOutputsFinished: " << ProcessOutputsState.AllOutputsFinished << Endl; + str << " LastRunStatus: " << ProcessOutputsState.LastRunStatus << Endl; + str << " LastRunTime: " << ProcessOutputsState.LastRunTime << Endl; + str << " LastPopReturnedNoData: " << ProcessOutputsState.LastPopReturnedNoData << Endl; + } + + str << Endl << "Input Channels:" << Endl; + TABLE_SORTABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH_ATTRS({{"title", "ChannelId"}}) {str << "Id";} + TABLEH_ATTRS({{"title", "SrcStageId"}}) {str << "Src";} + TABLEH_ATTRS({{"title", "InputIndex"}}) {str << "Idx";} + TABLEH() {str << "PeerId";} + TABLEH_ATTRS({{"title", "IsFinished"}}) {str << "F";} + TABLEH() {str << "Push.Bytes";} + TABLEH() {str << "Push.Rows";} + TABLEH() {str << "Pop.Bytes";} + TABLEH() {str << "Pop.Rows";} + } + } + TABLEBODY() { + for (const auto& [id, info]: InputChannelsMap) { + TABLER() { + TABLED() {str << info.ChannelId;} + TABLED() {str << info.SrcStageId;} + TABLED() {str << info.InputIndex;} + TABLED() { + if (info.HasPeer) { + HREF(TStringBuilder() << "/node/" << info.PeerId.NodeId() << "/actors/kqp_node?ca=" << info.PeerId) { + str << info.PeerId; + } + } else { + str << "N/A"; + } + } - this->Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str())); + auto channel = info.Channel; + if (!channel) { + auto stats = GetTaskRunnerStats(); + if (stats) { + auto stageIt = stats->InputChannels.find(info.SrcStageId); + if (stageIt != stats->InputChannels.end()) { + auto channelIt = stageIt->second.find(info.ChannelId); + if (channelIt != stageIt->second.end()) { + channel = channelIt->second; + } + } + } + } + + if (channel) { + TABLED() {str << channel->IsFinished();} + auto& pushStats = channel->GetPushStats(); + TABLED() {str << pushStats.Bytes;} + TABLED() {str << pushStats.Rows;} + auto& popStats = channel->GetPopStats(); + TABLED() {str << popStats.Bytes;} + TABLED() {str << popStats.Rows;} + } else { + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + } + } + } + } + } + + str << Endl << "Output Channels:" << Endl; + TABLE_SORTABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH_ATTRS({{"title", "ChannelId"}}) {str << "Id";} + TABLEH_ATTRS({{"title", "DstStageId"}}) {str << "Dst";} + TABLEH() {str << "PeerId";} + TABLEH_ATTRS({{"title", "Finished"}}) {str << "F";} + TABLEH_ATTRS({{"title", "EarlyFinish"}}) {str << "EF";} + TABLEH() {str << "Push.Bytes";} + TABLEH() {str << "Push.Rows";} + TABLEH() {str << "Pop.Bytes";} + TABLEH() {str << "Pop.Rows";} + } + } + TABLEBODY() { + for (const auto& [id, info]: OutputChannelsMap) { + TABLER() { + TABLED() {str << info.ChannelId;} + TABLED() {str << info.DstStageId;} + TABLED() { + if (info.HasPeer) { + HREF(TStringBuilder() << "/node/" << info.PeerId.NodeId() << "/actors/kqp_node?ca=" << info.PeerId) { + str << info.PeerId; + } + } else { + str << "N/A"; + } + } + TABLED() {str << info.Finished;} + TABLED() {str << info.EarlyFinish;} + + auto channel = info.Channel; + if (!channel) { + auto stats = GetTaskRunnerStats(); + if (stats) { + auto stageIt = stats->OutputChannels.find(info.DstStageId); + if (stageIt != stats->OutputChannels.end()) { + auto channelIt = stageIt->second.find(info.ChannelId); + if (channelIt != stageIt->second.end()) { + channel = channelIt->second; + } + } + } + } + + if (channel) { + auto& pushStats = channel->GetPushStats(); + TABLED() {str << pushStats.Bytes;} + TABLED() {str << pushStats.Rows;} + auto& popStats = channel->GetPopStats(); + TABLED() {str << popStats.Bytes;} + TABLED() {str << popStats.Rows;} + } else { + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + TABLED() {str << "N/A";} + } + } + } + } + } + } + } + } + + void OnMonitoringPage(NActors::NMon::TEvHttpInfo::TPtr& ev) { + TStringStream str; + + const TCgiParameters &cgi = ev->Get()->Request.GetParams(); + auto view = cgi.Get("view"); + if (view == "dump") { + DumpForMonitoring(str); + } else if (view == "run") { + if (this->Running) { + this->DoExecute(); + } + DefaultMonitoringPage(str); + } else { + DefaultMonitoringPage(str); + } + + this->Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(str.Str())); } protected: @@ -1930,6 +2110,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped auto result = InputChannelsMap.emplace( channel.GetId(), TInputChannelInfo( + i, LogPrefix, channel.GetId(), channel.GetSrcStageId(), @@ -1967,7 +2148,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped for (auto& channel : outputDesc.GetChannels()) { TOutputChannelInfo outputChannel(channel.GetId(), channel.GetDstStageId()); outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId(); - outputChannel.IsTransformOutput = outputDesc.HasTransform(); + if (outputChannel.HasPeer) { + outputChannel.PeerId = NActors::ActorIdFromProto(channel.GetDstEndpoint().GetActorId()); + } + outputChannel.IsTransformOutput = outputDesc.HasTransform(); outputChannel.WatermarksMode = channel.GetWatermarksMode(); if (Y_UNLIKELY(RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) { @@ -2366,6 +2550,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped bool DataWasSent = false; bool AllOutputsFinished = true; ERunStatus LastRunStatus = ERunStatus::PendingInput; + TInstant LastRunTime; bool LastPopReturnedNoData = false; }; TProcessOutputsState ProcessOutputsState; diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp index 3c4b5821ad8f..0d51091006b4 100644 --- a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp @@ -45,6 +45,9 @@ struct TMockHttpRequest : NMonitoring::IMonHttpRequest { TStringStream Out; TCgiParameters Params; THttpHeaders Headers; + TMockHttpRequest() { + Params.Scan("view=dump"); + } IOutputStream& Output() override { return Out; } @@ -639,8 +642,8 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { } void DumpMonPage(auto asyncCA, auto hook) { + TMockHttpRequest request; { - TMockHttpRequest request; auto evHttpInfo = MakeHolder(request); ActorSystem.Send(asyncCA, EdgeActor, evHttpInfo.Release()); } diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_sync_compute_actor_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_sync_compute_actor_ut.cpp index 03073b8a2e56..02175dcf1fc8 100644 --- a/ydb/library/yql/dq/actors/compute/ut/dq_sync_compute_actor_ut.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/dq_sync_compute_actor_ut.cpp @@ -53,6 +53,9 @@ struct TMockHttpRequest : NMonitoring::IMonHttpRequest { TStringStream Out; TCgiParameters Params; THttpHeaders Headers; + TMockHttpRequest() { + Params.Scan("view=dump"); + } IOutputStream& Output() override { return Out; } @@ -654,8 +657,8 @@ struct TSyncComputeActorTestFixture: public NUnitTest::TBaseFixture { } void DumpMonPage(auto syncCA, auto hook) { + TMockHttpRequest request; { - TMockHttpRequest request; auto evHttpInfo = MakeHolder(request); ActorSystem.Send(syncCA, EdgeActor, evHttpInfo.Release()); }