Skip to content

Commit 628914d

Browse files
committed
try to adjust recycling strategy for large message cache
1 parent 98a3449 commit 628914d

File tree

4 files changed

+82
-57
lines changed

4 files changed

+82
-57
lines changed

src/ipc.cpp

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131

3232
namespace {
3333

34-
using msg_id_t = std::uint32_t;
35-
using acc_t = std::atomic<msg_id_t>;
34+
using msg_id_t = std::uint32_t;
35+
using acc_t = std::atomic<msg_id_t>;
3636

3737
template <std::size_t DataSize, std::size_t AlignSize>
3838
struct msg_t;
@@ -91,6 +91,14 @@ auto cc_acc() {
9191
return static_cast<acc_t*>(acc_h.get());
9292
}
9393

94+
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
95+
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
96+
}
97+
98+
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
99+
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size));
100+
}
101+
94102
struct chunk_info_t {
95103
ipc::id_pool<> pool_;
96104
ipc::spin_lock lock_;
@@ -99,9 +107,13 @@ struct chunk_info_t {
99107
return ipc::id_pool<>::max_count * chunk_size;
100108
}
101109

102-
ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
103-
if (id < 0) return nullptr;
104-
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunk_size * id);
110+
ipc::byte_t* chunks_mem() noexcept {
111+
return reinterpret_cast<ipc::byte_t*>(this + 1);
112+
}
113+
114+
ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
115+
assert(id >= 0);
116+
return chunks_mem() + (chunk_size * id);
105117
}
106118
};
107119

@@ -129,16 +141,11 @@ auto& chunk_storages() {
129141
return chunk_s;
130142
}
131143

132-
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
133-
return ipc::make_align(alignof(std::max_align_t),
134-
(((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align);
135-
}
136-
137144
chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
138145
return chunk_storages()[chunk_size].get_info(chunk_size);
139146
}
140147

141-
std::pair<ipc::storage_id_t, void*> apply_storage(std::size_t size) {
148+
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size) {
142149
std::size_t chunk_size = calc_chunk_size(size);
143150
auto info = chunk_storage_info(chunk_size);
144151
if (info == nullptr) return {};
@@ -165,14 +172,12 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) {
165172

166173
void release_storage(ipc::storage_id_t id, std::size_t size) {
167174
if (id < 0) {
168-
ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
175+
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
169176
return;
170177
}
171-
172178
std::size_t chunk_size = calc_chunk_size(size);
173179
auto info = chunk_storage_info(chunk_size);
174180
if (info == nullptr) return;
175-
176181
info->lock_.lock();
177182
info->pool_.release(id);
178183
info->lock_.unlock();
@@ -381,7 +386,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
381386
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
382387
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
383388
if (size > ipc::large_msg_limit) {
384-
auto dat = apply_storage(size);
389+
auto dat = acquire_storage(size);
385390
void * buf = dat.second;
386391
if (buf != nullptr) {
387392
std::memcpy(buf, data, size);
@@ -416,7 +421,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
416421
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
417422
if (!wait_for(info->wt_waiter_, [&] {
418423
return !que->push(
419-
recycle_message<typename queue_t::value_t>,
424+
[](void*) { return true; },
420425
info->cc_id_, msg_id, remain, data, size);
421426
}, tm)) {
422427
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
@@ -437,7 +442,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
437442
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
438443
if (!wait_for(info->wt_waiter_, [&] {
439444
return !que->push(
440-
recycle_message<typename queue_t::value_t>,
445+
[](void*) { return true; },
441446
info->cc_id_, msg_id, remain, data, size);
442447
}, tm)) {
443448
return false;
@@ -462,7 +467,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
462467
for (;;) {
463468
// pop a new message
464469
typename queue_t::value_t msg;
465-
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
470+
bool recycled = false;
471+
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] {
472+
return !que->pop(msg, [&recycled](bool r) { recycled = r; });
473+
}, tm)) {
466474
// pop failed, just return.
467475
return {};
468476
}
@@ -477,23 +485,29 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
477485
return {};
478486
}
479487
std::size_t msg_size = static_cast<std::size_t>(r_size);
488+
// large message
489+
if (msg.storage_) {
490+
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
491+
void* buf = find_storage(buf_id, msg_size);
492+
if (buf != nullptr) {
493+
if (recycled) {
494+
return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) {
495+
release_storage(reinterpret_cast<ipc::storage_id_t>(pmid) - 1, size);
496+
}, reinterpret_cast<void*>(buf_id + 1)};
497+
} else {
498+
return ipc::buff_t{buf, msg_size}; // no recycle
499+
}
500+
} else {
501+
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
502+
continue;
503+
}
504+
}
480505
// find cache with msg.id_
481506
auto cac_it = rc.find(msg.id_);
482507
if (cac_it == rc.end()) {
483508
if (msg_size <= ipc::data_length) {
484509
return make_cache(msg.data_, msg_size);
485510
}
486-
if (msg.storage_) {
487-
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
488-
void * buf = find_storage(buf_id, msg_size);
489-
if (buf != nullptr) {
490-
return ipc::buff_t{buf, msg_size};
491-
}
492-
else {
493-
ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
494-
continue;
495-
}
496-
}
497511
// gc
498512
if (rc.size() > 1024) {
499513
std::vector<msg_id_t> need_del;

src/libipc/circ/elem_array.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ class elem_array : public ipc::circ::conn_head<Policy> {
130130
return head_.force_push(que, std::forward<F>(f), block_);
131131
}
132132

133-
template <typename Q, typename F>
134-
bool pop(Q* que, cursor_t* cur, F&& f) {
133+
template <typename Q, typename F, typename R>
134+
bool pop(Q* que, cursor_t* cur, F&& f, R&& out) {
135135
if (cur == nullptr) return false;
136-
return head_.pop(que, *cur, std::forward<F>(f), block_);
136+
return head_.pop(que, *cur, std::forward<F>(f), std::forward<R>(out), block_);
137137
}
138138
};
139139

src/libipc/prod_cons.h

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,14 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
5858
return false;
5959
}
6060

61-
template <typename W, typename F, typename E>
62-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) {
61+
template <typename W, typename F, typename R, typename E>
62+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) {
6363
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
6464
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
6565
return false; // empty
6666
}
6767
std::forward<F>(f)(&(elems[cur_rd].data_));
68+
std::forward<R>(out)(true);
6869
rd_.fetch_add(1, std::memory_order_release);
6970
return true;
7071
}
@@ -80,8 +81,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
8081
return false;
8182
}
8283

83-
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
84-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
84+
template <typename W, typename F, typename R,
85+
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
86+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
8587
byte_t buff[DS];
8688
for (unsigned k = 0;;) {
8789
auto cur_rd = rd_.load(std::memory_order_relaxed);
@@ -92,6 +94,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
9294
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
9395
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
9496
std::forward<F>(f)(buff);
97+
std::forward<R>(out)(true);
9598
return true;
9699
}
97100
ipc::yield(k);
@@ -156,8 +159,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
156159
return false;
157160
}
158161

159-
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
160-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
162+
template <typename W, typename F, typename R,
163+
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
164+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
161165
byte_t buff[DS];
162166
for (unsigned k = 0;;) {
163167
auto cur_rd = rd_.load(std::memory_order_relaxed);
@@ -179,6 +183,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
179183
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
180184
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
181185
std::forward<F>(f)(buff);
186+
std::forward<R>(out)(true);
182187
return true;
183188
}
184189
ipc::yield(k);
@@ -263,20 +268,20 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
263268
return true;
264269
}
265270

266-
template <typename W, typename F, typename E>
267-
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) {
271+
template <typename W, typename F, typename R, typename E>
272+
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
268273
if (cur == cursor()) return false; // acquire
269274
auto* el = elems + circ::index_of(cur++);
270275
std::forward<F>(f)(&(el->data_));
271276
for (unsigned k = 0;;) {
272277
auto cur_rc = el->rc_.load(std::memory_order_acquire);
273-
circ::cc_t rem_cc = cur_rc & ep_mask;
274-
if (rem_cc == 0) {
278+
if ((cur_rc & ep_mask) == 0) {
279+
std::forward<R>(out)(true);
275280
return true;
276281
}
277-
if (el->rc_.compare_exchange_weak(cur_rc,
278-
cur_rc & ~static_cast<rc_t>(wrapper->connected_id()),
279-
std::memory_order_release)) {
282+
auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
283+
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
284+
std::forward<R>(out)((nxt_rc & ep_mask) == 0);
280285
return true;
281286
}
282287
ipc::yield(k);
@@ -395,8 +400,8 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
395400
return true;
396401
}
397402

398-
template <typename W, typename F, typename E, std::size_t N>
399-
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) {
403+
template <typename W, typename F, typename R, typename E, std::size_t N>
404+
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
400405
auto* el = elems + circ::index_of(cur);
401406
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
402407
if (cur_fl != ~static_cast<flag_t>(cur)) {
@@ -406,17 +411,18 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
406411
std::forward<F>(f)(&(el->data_));
407412
for (unsigned k = 0;;) {
408413
auto cur_rc = el->rc_.load(std::memory_order_acquire);
409-
circ::cc_t rem_cc = cur_rc & rc_mask;
410-
if (rem_cc == 0) {
414+
if ((cur_rc & rc_mask) == 0) {
415+
std::forward<R>(out)(true);
411416
el->f_ct_.store(cur + N - 1, std::memory_order_release);
412417
return true;
413418
}
414-
if ((rem_cc & ~wrapper->connected_id()) == 0) {
419+
auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
420+
bool last_one = false;
421+
if ((last_one = (nxt_rc & rc_mask) == 0)) {
415422
el->f_ct_.store(cur + N - 1, std::memory_order_release);
416423
}
417-
if (el->rc_.compare_exchange_weak(cur_rc,
418-
inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()),
419-
std::memory_order_release)) {
424+
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
425+
std::forward<R>(out)(last_one);
420426
return true;
421427
}
422428
ipc::yield(k);

src/libipc/queue.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,14 @@ class queue_base : public queue_conn {
171171
});
172172
}
173173

174-
template <typename T>
175-
bool pop(T& item) {
174+
template <typename T, typename F>
175+
bool pop(T& item, F&& out) {
176176
if (elems_ == nullptr) {
177177
return false;
178178
}
179179
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
180180
::new (&item) T(std::move(*static_cast<T*>(p)));
181-
});
181+
}, std::forward<F>(out));
182182
}
183183
};
184184

@@ -204,7 +204,12 @@ class queue final : public detail::queue_base<typename Policy::template elems_t<
204204
}
205205

206206
bool pop(T& item) {
207-
return base_t::pop(item);
207+
return base_t::pop(item, [](bool) {});
208+
}
209+
210+
template <typename F>
211+
bool pop(T& item, F&& out) {
212+
return base_t::pop(item, std::forward<F>(out));
208213
}
209214
};
210215

0 commit comments

Comments
 (0)