Skip to content

Commit 7bedfbf

Browse files
committed
fix: large message cache may not be recycled with multiple receivers
1 parent 130e4d6 commit 7bedfbf

File tree

4 files changed

+31
-79
lines changed

4 files changed

+31
-79
lines changed

demo/msg_que/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16;
2323
std::atomic<bool> is_quit__{ false };
2424
std::atomic<std::size_t> size_counter__{ 0 };
2525

26-
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
26+
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
2727

2828
msg_que_t que__{ name__ };
2929
ipc::byte_t buff__[max_sz];

src/ipc.cpp

Lines changed: 26 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ struct chunk_info_t {
9595
ipc::spin_lock lock_;
9696

9797
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
98-
return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t));
98+
return ipc::make_align(alignof(std::max_align_t), chunk_size);
9999
}
100100

101101
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
@@ -146,9 +146,7 @@ auto& chunk_storage(std::size_t chunk_size) {
146146
return chunk_storages()[chunk_size];
147147
}
148148

149-
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
150-
if (conn_count == 0) return {};
151-
149+
std::pair<std::size_t, void*> apply_storage(std::size_t size) {
152150
std::size_t chunk_size = calc_chunk_size(size);
153151
auto & chunk_shm = chunk_storage(chunk_size);
154152

@@ -161,88 +159,46 @@ std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t
161159
auto id = info->pool_.acquire();
162160
info->lock_.unlock();
163161

164-
auto ptr = info->at(chunk_size, id);
165-
if (ptr == nullptr) return {};
166-
reinterpret_cast<acc_t*>(ptr + chunk_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release);
167-
return { id, ptr };
162+
return { id, info->at(chunk_size, id) };
168163
}
169164

170165
void *find_storage(std::size_t id, std::size_t size) {
171166
if (id == ipc::invalid_value) {
172-
ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size);
167+
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
173168
return nullptr;
174169
}
175-
176170
std::size_t chunk_size = calc_chunk_size(size);
177171
auto & chunk_shm = chunk_storage(chunk_size);
178-
179172
auto info = chunk_shm.get_info(chunk_size);
180173
if (info == nullptr) return nullptr;
181-
182-
auto ptr = info->at(chunk_size, id);
183-
if (ptr == nullptr) return nullptr;
184-
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->load(std::memory_order_acquire) == 0) {
185-
ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size);
186-
return nullptr;
187-
}
188-
return ptr;
174+
return info->at(chunk_size, id);
189175
}
190176

191-
void recycle_storage(std::size_t id, std::size_t size) {
177+
void clear_storage(std::size_t id, std::size_t size) {
192178
if (id == ipc::invalid_value) {
193-
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size);
179+
ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
194180
return;
195181
}
196182

197183
std::size_t chunk_size = calc_chunk_size(size);
198184
auto & chunk_shm = chunk_storage(chunk_size);
199-
200185
auto info = chunk_shm.get_info(chunk_size);
201186
if (info == nullptr) return;
202187

203-
auto ptr = info->at(chunk_size, id);
204-
if (ptr == nullptr) {
205-
ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size);
206-
return;
207-
}
208-
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
209-
// not the last receiver, just return
210-
return;
211-
}
212-
213188
info->lock_.lock();
214189
info->pool_.release(id);
215190
info->lock_.unlock();
216191
}
217192

218-
void clear_storage(std::size_t id, std::size_t size) {
219-
if (id == ipc::invalid_value) {
220-
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size);
221-
return;
193+
template <typename MsgT>
194+
bool recycle_message(void* p) {
195+
auto msg = static_cast<MsgT*>(p);
196+
if (msg->storage_) {
197+
clear_storage(
198+
*reinterpret_cast<std::size_t*>(&msg->data_),
199+
static_cast<std::int32_t>(ipc::data_length) + msg->remain_);
222200
}
223-
224-
std::size_t chunk_size = calc_chunk_size(size);
225-
auto & chunk_shm = chunk_storage(chunk_size);
226-
227-
auto info = chunk_shm.get_info(chunk_size);
228-
if (info == nullptr) return;
229-
230-
auto ptr = info->at(chunk_size, id);
231-
if (ptr == nullptr) return;
232-
233-
auto cc_flag = reinterpret_cast<acc_t*>(ptr + chunk_size);
234-
for (unsigned k = 0;;) {
235-
auto cc_curr = cc_flag->load(std::memory_order_acquire);
236-
if (cc_curr == 0) return; // means this id has been cleared
237-
if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) {
238-
break;
239-
}
240-
ipc::yield(k);
241-
}
242-
243-
info->lock_.lock();
244-
info->pool_.release(id);
245-
info->lock_.unlock();
201+
return true;
246202
}
247203

248204
struct conn_info_head {
@@ -432,7 +388,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
432388
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
433389
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
434390
if (size > ipc::large_msg_limit) {
435-
auto dat = apply_storage(que->conn_count(), size);
391+
auto dat = apply_storage(size);
436392
void * buf = dat.second;
437393
if (buf != nullptr) {
438394
std::memcpy(buf, data, size);
@@ -466,18 +422,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
466422
return send([tm](auto info, auto que, auto msg_id) {
467423
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
468424
if (!wait_for(info->wt_waiter_, [&] {
469-
return !que->push(info->cc_id_, msg_id, remain, data, size);
425+
return !que->push(
426+
recycle_message<typename queue_t::value_t>,
427+
info->cc_id_, msg_id, remain, data, size);
470428
}, tm)) {
471429
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
472-
if (!que->force_push([](void* p) {
473-
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
474-
if (tmp_msg->storage_) {
475-
clear_storage(
476-
*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
477-
static_cast<std::int32_t>(ipc::data_length) + tmp_msg->remain_);
478-
}
479-
return true;
480-
}, info->cc_id_, msg_id, remain, data, size)) {
430+
if (!que->force_push(
431+
recycle_message<typename queue_t::value_t>,
432+
info->cc_id_, msg_id, remain, data, size)) {
481433
return false;
482434
}
483435
}
@@ -491,7 +443,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
491443
return send([tm](auto info, auto que, auto msg_id) {
492444
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
493445
if (!wait_for(info->wt_waiter_, [&] {
494-
return !que->push(info->cc_id_, msg_id, remain, data, size);
446+
return !que->push(
447+
recycle_message<typename queue_t::value_t>,
448+
info->cc_id_, msg_id, remain, data, size);
495449
}, tm)) {
496450
return false;
497451
}
@@ -535,9 +489,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
535489
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
536490
void * buf = find_storage(buf_id, remain);
537491
if (buf != nullptr) {
538-
return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) {
539-
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
540-
}, reinterpret_cast<void*>(buf_id + 1) };
492+
return ipc::buff_t{buf, remain};
541493
}
542494
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
543495
}

src/libipc/queue.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,11 @@ class queue_base : public queue_conn {
155155
return !valid() || (cursor_ == elems_->cursor());
156156
}
157157

158-
template <typename T, typename... P>
159-
bool push(P&&... params) {
158+
template <typename T, typename F, typename... P>
159+
bool push(F&& prep, P&&... params) {
160160
if (elems_ == nullptr) return false;
161161
return elems_->push(this, [&](void* p) {
162-
::new (p) T(std::forward<P>(params)...);
162+
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
163163
});
164164
}
165165

test/test_queue.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ constexpr int ThreadMax = 8;
4444

4545
template <typename Que>
4646
void push(Que & que, int p, int d) {
47-
for (int n = 0; !que.push(p, d); ++n) {
47+
for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) {
4848
ASSERT_NE(n, PushRetry);
4949
std::this_thread::yield();
5050
}

0 commit comments

Comments
 (0)