@@ -50,15 +50,16 @@ struct msg_t : msg_t<0, AlignSize> {
5050 std::aligned_storage_t <DataSize, AlignSize> data_ {};
5151
5252 msg_t () = default ;
53- msg_t (msg_id_t c , msg_id_t i , std::int32_t r , void const * d , std::size_t s )
54- : msg_t <0 , AlignSize> { c, i, r , (d == nullptr ) || (s == 0 ) } {
53+ msg_t (msg_id_t conn , msg_id_t id , std::int32_t remain , void const * data , std::size_t size )
54+ : msg_t <0 , AlignSize> {conn, id, remain , (data == nullptr ) || (size == 0 )} {
5555 if (this ->storage_ ) {
56- if (d != nullptr ) {
56+ if (data != nullptr ) {
5757 // copy storage-id
58- *reinterpret_cast <std::size_t *>(&data_) = *static_cast <std::size_t const *>(d);
58+ *reinterpret_cast <ipc::storage_id_t *>(&data_) =
59+ *static_cast <ipc::storage_id_t const *>(data);
5960 }
6061 }
61- else std::memcpy (&data_, d, s );
62+ else std::memcpy (&data_, data, size );
6263 }
6364};
6465
@@ -94,17 +95,13 @@ struct chunk_info_t {
9495 ipc::id_pool<> pool_;
9596 ipc::spin_lock lock_;
9697
97- IPC_CONSTEXPR_ static std::size_t chunks_elem_size (std::size_t chunk_size) noexcept {
98- return ipc::make_align (alignof (std::max_align_t ), chunk_size);
99- }
100-
10198 IPC_CONSTEXPR_ static std::size_t chunks_mem_size (std::size_t chunk_size) noexcept {
102- return ipc::id_pool<>::max_count * chunks_elem_size ( chunk_size) ;
99+ return ipc::id_pool<>::max_count * chunk_size;
103100 }
104101
105- ipc::byte_t *at (std::size_t chunk_size, std:: size_t id) noexcept {
106- if (id == ipc::invalid_value ) return nullptr ;
107- return reinterpret_cast <ipc::byte_t *>(this + 1 ) + (chunks_elem_size ( chunk_size) * id);
102+ ipc::byte_t *at (std::size_t chunk_size, ipc:: storage_id_t id) noexcept {
103+ if (id < 0 ) return nullptr ;
104+ return reinterpret_cast <ipc::byte_t *>(this + 1 ) + (chunk_size * id);
108105 }
109106};
110107
@@ -128,29 +125,22 @@ auto& chunk_storages() {
128125 return info;
129126 }
130127 };
131- static ipc::unordered_map<std::size_t , chunk_t > chunk_s;
128+ thread_local ipc::unordered_map<std::size_t , chunk_t > chunk_s;
132129 return chunk_s;
133130}
134131
135- auto & chunk_lock () {
136- static ipc::spin_lock chunk_l;
137- return chunk_l ;
132+ IPC_CONSTEXPR_ std:: size_t calc_chunk_size (std:: size_t size) noexcept {
133+ return ipc::make_align ( alignof (std:: max_align_t ),
134+ (((size - 1 ) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align) ;
138135}
139136
140- constexpr std:: size_t calc_chunk_size (std::size_t size) noexcept {
141- return ( ((size - 1 ) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align ;
137+ chunk_info_t * chunk_storage_info (std::size_t chunk_size) {
138+ return chunk_storages ()[chunk_size]. get_info (chunk_size) ;
142139}
143140
144- auto & chunk_storage (std::size_t chunk_size) {
145- IPC_UNUSED_ auto guard = ipc::detail::unique_lock (chunk_lock ());
146- return chunk_storages ()[chunk_size];
147- }
148-
149- std::pair<std::size_t , void *> apply_storage (std::size_t size) {
141+ std::pair<ipc::storage_id_t , void *> apply_storage (std::size_t size) {
150142 std::size_t chunk_size = calc_chunk_size (size);
151- auto & chunk_shm = chunk_storage (chunk_size);
152-
153- auto info = chunk_shm.get_info (chunk_size);
143+ auto info = chunk_storage_info (chunk_size);
154144 if (info == nullptr ) return {};
155145
156146 info->lock_ .lock ();
@@ -162,27 +152,25 @@ std::pair<std::size_t, void*> apply_storage(std::size_t size) {
162152 return { id, info->at (chunk_size, id) };
163153}
164154
165- void *find_storage (std:: size_t id, std::size_t size) {
166- if (id == ipc::invalid_value ) {
155+ void *find_storage (ipc:: storage_id_t id, std::size_t size) {
156+ if (id < 0 ) {
167157 ipc::error (" [find_storage] id is invalid: id = %ld, size = %zd\n " , (long )id, size);
168158 return nullptr ;
169159 }
170160 std::size_t chunk_size = calc_chunk_size (size);
171- auto & chunk_shm = chunk_storage (chunk_size);
172- auto info = chunk_shm.get_info (chunk_size);
161+ auto info = chunk_storage_info (chunk_size);
173162 if (info == nullptr ) return nullptr ;
174163 return info->at (chunk_size, id);
175164}
176165
177- void clear_storage (std:: size_t id, std::size_t size) {
178- if (id == ipc::invalid_value ) {
166+ void release_storage (ipc:: storage_id_t id, std::size_t size) {
167+ if (id < 0 ) {
179168 ipc::error (" [clear_storage] id is invalid: id = %ld, size = %zd\n " , (long )id, size);
180169 return ;
181170 }
182171
183172 std::size_t chunk_size = calc_chunk_size (size);
184- auto & chunk_shm = chunk_storage (chunk_size);
185- auto info = chunk_shm.get_info (chunk_size);
173+ auto info = chunk_storage_info (chunk_size);
186174 if (info == nullptr ) return ;
187175
188176 info->lock_ .lock ();
@@ -194,9 +182,14 @@ template <typename MsgT>
194182bool recycle_message (void * p) {
195183 auto msg = static_cast <MsgT*>(p);
196184 if (msg->storage_ ) {
197- clear_storage (
198- *reinterpret_cast <std::size_t *>(&msg->data_ ),
199- static_cast <std::int32_t >(ipc::data_length) + msg->remain_ );
185+ std::int32_t r_size = static_cast <std::int32_t >(ipc::data_length) + msg->remain_ ;
186+ if (r_size <= 0 ) {
187+ ipc::error (" [recycle_message] invalid msg size: %d\n " , (int )r_size);
188+ return true ;
189+ }
190+ release_storage (
191+ *reinterpret_cast <ipc::storage_id_t *>(&msg->data_ ),
192+ static_cast <std::size_t >(r_size));
200193 }
201194 return true ;
202195}
@@ -396,11 +389,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
396389 static_cast <std::int32_t >(ipc::data_length), &(dat.first ), 0 );
397390 }
398391 // try using message fragment
399- // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
392+ // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
400393 }
401394 // push message fragment
402395 std::int32_t offset = 0 ;
403- for (int i = 0 ; i < static_cast <int >(size / ipc::data_length); ++i, offset += ipc::data_length) {
396+ for (std:: int32_t i = 0 ; i < static_cast <std:: int32_t >(size / ipc::data_length); ++i, offset += ipc::data_length) {
404397 if (!try_push (static_cast <std::int32_t >(size) - offset - static_cast <std::int32_t >(ipc::data_length),
405398 static_cast <ipc::byte_t const *>(data) + offset, ipc::data_length)) {
406399 return false ;
@@ -466,7 +459,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
466459 return {};
467460 }
468461 auto & rc = info_of (h)->recv_cache ();
469- while ( 1 ) {
462+ for (;; ) {
470463 // pop a new message
471464 typename queue_t ::value_t msg;
472465 if (!wait_for (info_of (h)->rd_waiter_ , [que, &msg] { return !que->pop (msg); }, tm)) {
@@ -478,20 +471,28 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
478471 continue ; // ignore message to self
479472 }
480473 // msg.remain_ may minus & abs(msg.remain_) < data_length
481- std::size_t remain = static_cast <std::int32_t >(ipc::data_length) + msg.remain_ ;
474+ std::int32_t r_size = static_cast <std::int32_t >(ipc::data_length) + msg.remain_ ;
475+ if (r_size <= 0 ) {
476+ ipc::error (" fail: recv, r_size = %d\n " , (int )r_size);
477+ return {};
478+ }
479+ std::size_t msg_size = static_cast <std::size_t >(r_size);
482480 // find cache with msg.id_
483481 auto cac_it = rc.find (msg.id_ );
484482 if (cac_it == rc.end ()) {
485- if (remain <= ipc::data_length) {
486- return make_cache (msg.data_ , remain );
483+ if (msg_size <= ipc::data_length) {
484+ return make_cache (msg.data_ , msg_size );
487485 }
488486 if (msg.storage_ ) {
489487 std::size_t buf_id = *reinterpret_cast <std::size_t *>(&msg.data_ );
490- void * buf = find_storage (buf_id, remain );
488+ void * buf = find_storage (buf_id, msg_size );
491489 if (buf != nullptr ) {
492- return ipc::buff_t {buf, remain};
490+ return ipc::buff_t {buf, msg_size};
491+ }
492+ else {
493+ ipc::log (" fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n " , msg.id_ , buf_id, msg_size);
494+ continue ;
493495 }
494- else ipc::log (" fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n " , msg.id_ , buf_id, remain);
495496 }
496497 // gc
497498 if (rc.size () > 1024 ) {
@@ -505,14 +506,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
505506 for (auto id : need_del) rc.erase (id);
506507 }
507508 // cache the first message fragment
508- rc.emplace (msg.id_ , cache_t { ipc::data_length, make_cache (msg.data_ , remain ) });
509+ rc.emplace (msg.id_ , cache_t { ipc::data_length, make_cache (msg.data_ , msg_size ) });
509510 }
510511 // has cached before this message
511512 else {
512513 auto & cac = cac_it->second ;
513514 // this is the last message fragment
514515 if (msg.remain_ <= 0 ) {
515- cac.append (&(msg.data_ ), remain );
516+ cac.append (&(msg.data_ ), msg_size );
516517 // finish this message, erase it from cache
517518 auto buff = std::move (cac.buff_ );
518519 rc.erase (cac_it);
0 commit comments