Skip to content

Commit bbd2419

Browse files
committed
fix: large message cache may not be recycled with multiple receivers
1 parent 57e5298 commit bbd2419

File tree

4 files changed

+31
-79
lines changed

4 files changed

+31
-79
lines changed

demo/msg_que/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16;
2323
std::atomic<bool> is_quit__{ false };
2424
std::atomic<std::size_t> size_counter__{ 0 };
2525

26-
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
26+
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
2727

2828
msg_que_t que__{ name__ };
2929
ipc::byte_t buff__[max_sz];

src/ipc.cpp

Lines changed: 26 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ struct chunk_info_t {
9696
ipc::spin_lock lock_;
9797

9898
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
99-
return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t));
99+
return ipc::make_align(alignof(std::max_align_t), chunk_size);
100100
}
101101

102102
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
@@ -147,9 +147,7 @@ auto& chunk_storage(std::size_t chunk_size) {
147147
return chunk_storages()[chunk_size];
148148
}
149149

150-
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
151-
if (conn_count == 0) return {};
152-
150+
std::pair<std::size_t, void*> apply_storage(std::size_t size) {
153151
std::size_t chunk_size = calc_chunk_size(size);
154152
auto & chunk_shm = chunk_storage(chunk_size);
155153

@@ -162,88 +160,46 @@ std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t
162160
auto id = info->pool_.acquire();
163161
info->lock_.unlock();
164162

165-
auto ptr = info->at(chunk_size, id);
166-
if (ptr == nullptr) return {};
167-
reinterpret_cast<acc_t*>(ptr + chunk_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release);
168-
return { id, ptr };
163+
return { id, info->at(chunk_size, id) };
169164
}
170165

171166
void *find_storage(std::size_t id, std::size_t size) {
172167
if (id == ipc::invalid_value) {
173-
ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size);
168+
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
174169
return nullptr;
175170
}
176-
177171
std::size_t chunk_size = calc_chunk_size(size);
178172
auto & chunk_shm = chunk_storage(chunk_size);
179-
180173
auto info = chunk_shm.get_info(chunk_size);
181174
if (info == nullptr) return nullptr;
182-
183-
auto ptr = info->at(chunk_size, id);
184-
if (ptr == nullptr) return nullptr;
185-
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->load(std::memory_order_acquire) == 0) {
186-
ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size);
187-
return nullptr;
188-
}
189-
return ptr;
175+
return info->at(chunk_size, id);
190176
}
191177

192-
void recycle_storage(std::size_t id, std::size_t size) {
178+
void clear_storage(std::size_t id, std::size_t size) {
193179
if (id == ipc::invalid_value) {
194-
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size);
180+
ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
195181
return;
196182
}
197183

198184
std::size_t chunk_size = calc_chunk_size(size);
199185
auto & chunk_shm = chunk_storage(chunk_size);
200-
201186
auto info = chunk_shm.get_info(chunk_size);
202187
if (info == nullptr) return;
203188

204-
auto ptr = info->at(chunk_size, id);
205-
if (ptr == nullptr) {
206-
ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size);
207-
return;
208-
}
209-
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
210-
// not the last receiver, just return
211-
return;
212-
}
213-
214189
info->lock_.lock();
215190
info->pool_.release(id);
216191
info->lock_.unlock();
217192
}
218193

219-
void clear_storage(std::size_t id, std::size_t size) {
220-
if (id == ipc::invalid_value) {
221-
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size);
222-
return;
194+
template <typename MsgT>
195+
bool recycle_message(void* p) {
196+
auto msg = static_cast<MsgT*>(p);
197+
if (msg->storage_) {
198+
clear_storage(
199+
*reinterpret_cast<std::size_t*>(&msg->data_),
200+
static_cast<std::int32_t>(ipc::data_length) + msg->remain_);
223201
}
224-
225-
std::size_t chunk_size = calc_chunk_size(size);
226-
auto & chunk_shm = chunk_storage(chunk_size);
227-
228-
auto info = chunk_shm.get_info(chunk_size);
229-
if (info == nullptr) return;
230-
231-
auto ptr = info->at(chunk_size, id);
232-
if (ptr == nullptr) return;
233-
234-
auto cc_flag = reinterpret_cast<acc_t*>(ptr + chunk_size);
235-
for (unsigned k = 0;;) {
236-
auto cc_curr = cc_flag->load(std::memory_order_acquire);
237-
if (cc_curr == 0) return; // means this id has been cleared
238-
if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) {
239-
break;
240-
}
241-
ipc::yield(k);
242-
}
243-
244-
info->lock_.lock();
245-
info->pool_.release(id);
246-
info->lock_.unlock();
202+
return true;
247203
}
248204

249205
struct conn_info_head {
@@ -445,7 +401,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
445401
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
446402
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
447403
if (size > ipc::large_msg_limit) {
448-
auto dat = apply_storage(que->conn_count(), size);
404+
auto dat = apply_storage(size);
449405
void * buf = dat.second;
450406
if (buf != nullptr) {
451407
std::memcpy(buf, data, size);
@@ -479,18 +435,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
479435
return send([tm](auto info, auto que, auto msg_id) {
480436
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
481437
if (!wait_for(info->wt_waiter_, [&] {
482-
return !que->push(info->cc_id_, msg_id, remain, data, size);
438+
return !que->push(
439+
recycle_message<typename queue_t::value_t>,
440+
info->cc_id_, msg_id, remain, data, size);
483441
}, tm)) {
484442
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
485-
if (!que->force_push([](void* p) {
486-
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
487-
if (tmp_msg->storage_) {
488-
clear_storage(
489-
*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
490-
static_cast<std::int32_t>(ipc::data_length) + tmp_msg->remain_);
491-
}
492-
return true;
493-
}, info->cc_id_, msg_id, remain, data, size)) {
443+
if (!que->force_push(
444+
recycle_message<typename queue_t::value_t>,
445+
info->cc_id_, msg_id, remain, data, size)) {
494446
return false;
495447
}
496448
}
@@ -504,7 +456,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
504456
return send([tm](auto info, auto que, auto msg_id) {
505457
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
506458
if (!wait_for(info->wt_waiter_, [&] {
507-
return !que->push(info->cc_id_, msg_id, remain, data, size);
459+
return !que->push(
460+
recycle_message<typename queue_t::value_t>,
461+
info->cc_id_, msg_id, remain, data, size);
508462
}, tm)) {
509463
return false;
510464
}
@@ -548,9 +502,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
548502
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
549503
void * buf = find_storage(buf_id, remain);
550504
if (buf != nullptr) {
551-
return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) {
552-
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
553-
}, reinterpret_cast<void*>(buf_id + 1) };
505+
return ipc::buff_t{buf, remain};
554506
}
555507
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
556508
}

src/libipc/queue.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,11 @@ class queue_base : public queue_conn {
155155
return !valid() || (cursor_ == elems_->cursor());
156156
}
157157

158-
template <typename T, typename... P>
159-
bool push(P&&... params) {
158+
template <typename T, typename F, typename... P>
159+
bool push(F&& prep, P&&... params) {
160160
if (elems_ == nullptr) return false;
161161
return elems_->push(this, [&](void* p) {
162-
::new (p) T(std::forward<P>(params)...);
162+
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
163163
});
164164
}
165165

test/test_queue.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ constexpr int ThreadMax = 8;
4444

4545
template <typename Que>
4646
void push(Que & que, int p, int d) {
47-
for (int n = 0; !que.push(p, d); ++n) {
47+
for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) {
4848
ASSERT_NE(n, PushRetry);
4949
std::this_thread::yield();
5050
}

0 commit comments

Comments
 (0)