@@ -70,7 +70,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
7070 auto sessionIter = ServerSessions.find (key);
7171 if (sessionIter.IsEnd ()) {
7272 PQ_CPROXY_LOG_D (" unknown session id '" << key.SessionId << " ', close session" );
73- CloseSession (ev->Sender , Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST, " Unknown session" );
73+ CloseSession (ev->Sender , key. SessionId , Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST, " Unknown session" );
7474 return ;
7575 }
7676
@@ -338,7 +338,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
338338 if (nextData == sessionIter->second .Reads .end ()) {
339339 return false ;
340340 }
341- auto result = SendData (sessionIter->first .PartitionSessionId , client, nextData->first , nextData->second );
341+ auto result = SendData (sessionIter->first .SessionId , sessionIter-> first . PartitionSessionId , client, nextData->first , nextData->second );
342342 ChangeCounterValue (" SendDataRate" , 1 , false , true );
343343 if (!result) {
344344 // ToDo: for discuss. Error in parsing partition response - shall we kill the entire session or just the partition session?
@@ -350,7 +350,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
350350 }
351351
352352 [[nodiscard]] bool SendData (
353- ui64 partSessionId, TCacheClientContext& proxyClient, ui64 readId, const std::shared_ptr<NKikimrClient::TResponse>& response
353+ const TString& sessionId, ui64 partSessionId, TCacheClientContext& proxyClient, ui64 readId, const std::shared_ptr<NKikimrClient::TResponse>& response
354354 ) {
355355 const auto & ctx = ActorContext ();
356356 auto message = std::make_shared<StreamDirectReadMessage::FromServer>();
@@ -359,7 +359,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
359359 directReadMessage->set_partition_session_id (partSessionId);
360360 directReadMessage->set_bytes_size (response->GetPartitionResponse ().GetCmdPrepareReadResult ().GetBytesSizeEstimate ());
361361
362- auto ok = VaildatePartitionResponse (proxyClient, *response);
362+ auto ok = VaildatePartitionResponse (sessionId, proxyClient, * response);
363363 if (!ok) {
364364 return false ;
365365 }
@@ -368,20 +368,21 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
368368 partSessionId);
369369 message->set_status (Ydb::StatusIds::SUCCESS);
370370
371- PQ_CPROXY_LOG_D (" send data to client. AssignId : " << partSessionId << " , readId: " << readId);
371+ PQ_CPROXY_LOG_D (" send data to client " << sessionId << " , assignId : " << partSessionId << " , readId: " << readId);
372372
373373 ctx.Send (proxyClient.ProxyId , new TEvPQProxy::TEvDirectReadSendClientData (std::move (message)));
374374 return true ;
375375 }
376376
377377 void CloseSession (
378378 const TActorId& proxyId,
379+ const TString& sessionId,
379380 Ydb::PersQueue::ErrorCode::ErrorCode code,
380381 const TString& reason
381382 ) {
382383 const auto & ctx = ActorContext ();
383384 ctx.Send (proxyId, new TEvPQProxy::TEvDirectReadCloseSession (code, reason));
384- PQ_CPROXY_LOG_D (" close session for proxy " << proxyId.ToString ());
385+ PQ_CPROXY_LOG_D (" close session for proxy " << proxyId.ToString () << " , sessionId: " << sessionId );
385386 }
386387
387388 bool DestroyPartitionSession (
@@ -395,7 +396,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
395396 ctx.Send (
396397 sessionIter->second .Client ->ProxyId , new TEvPQProxy::TEvDirectReadDestroyPartitionSession (sessionIter->first , code, reason)
397398 );
398- PQ_CPROXY_LOG_D (" close session for proxy " << sessionIter->second .Client ->ProxyId .ToString ());
399+ PQ_CPROXY_LOG_D (" DestroyPartitionSession, sessionId: " << sessionIter-> first . SessionId << " , proxy: " << sessionIter->second .Client ->ProxyId .ToString ());
399400 return true ;
400401 }
401402
@@ -412,11 +413,12 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
412413 }
413414
414415 bool VaildatePartitionResponse (
415- TCacheClientContext& proxyClient, NKikimrClient::TResponse& response
416+ const TString& sessionId, TCacheClientContext& proxyClient, NKikimrClient::TResponse& response
416417 ) {
417418 if (response.HasErrorCode () && response.GetErrorCode () != NPersQueue::NErrorCode::OK) {
418419 CloseSession (
419420 proxyClient.ProxyId ,
421+ sessionId,
420422 NGRpcProxy::V1::ConvertOldCode (response.GetErrorCode ()),
421423 " Status is not ok: " + response.GetErrorReason ()
422424 );
@@ -426,6 +428,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
426428 if (response.GetStatus () != NKikimr::NMsgBusProxy::MSTATUS_OK) { // this is incorrect answer, die
427429 CloseSession (
428430 proxyClient.ProxyId ,
431+ sessionId,
429432 Ydb::PersQueue::ErrorCode::ERROR,
430433 " Status is not ok: " + response.GetErrorReason ()
431434 );
@@ -434,6 +437,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
434437 if (!response.HasPartitionResponse ()) { // this is incorrect answer, die
435438 CloseSession (
436439 proxyClient.ProxyId ,
440+ sessionId,
437441 Ydb::PersQueue::ErrorCode::ERROR,
438442 " Direct read cache got empty partition response"
439443 );
@@ -444,6 +448,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
444448 if (!partResponse.HasCmdReadResult ()) { // this is incorrect answer, die
445449 CloseSession (
446450 proxyClient.ProxyId ,
451+ sessionId,
447452 Ydb::PersQueue::ErrorCode::ERROR,
448453 " Malformed response from partition"
449454 );
0 commit comments