Skip to content

Commit 7dd9937

Browse files
committed
try to adjust recycling strategy for large message cache
1 parent 2179ce2 commit 7dd9937

File tree

4 files changed

+82
-57
lines changed

4 files changed

+82
-57
lines changed

src/ipc.cpp

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232

3333
namespace {
3434

35-
using msg_id_t = std::uint32_t;
36-
using acc_t = std::atomic<msg_id_t>;
35+
using msg_id_t = std::uint32_t;
36+
using acc_t = std::atomic<msg_id_t>;
3737

3838
template <std::size_t DataSize, std::size_t AlignSize>
3939
struct msg_t;
@@ -92,6 +92,14 @@ auto cc_acc() {
9292
return static_cast<acc_t*>(acc_h.get());
9393
}
9494

95+
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
96+
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
97+
}
98+
99+
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
100+
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size));
101+
}
102+
95103
struct chunk_info_t {
96104
ipc::id_pool<> pool_;
97105
ipc::spin_lock lock_;
@@ -100,9 +108,13 @@ struct chunk_info_t {
100108
return ipc::id_pool<>::max_count * chunk_size;
101109
}
102110

103-
ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
104-
if (id < 0) return nullptr;
105-
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunk_size * id);
111+
ipc::byte_t* chunks_mem() noexcept {
112+
return reinterpret_cast<ipc::byte_t*>(this + 1);
113+
}
114+
115+
ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
116+
assert(id >= 0);
117+
return chunks_mem() + (chunk_size * id);
106118
}
107119
};
108120

@@ -130,16 +142,11 @@ auto& chunk_storages() {
130142
return chunk_s;
131143
}
132144

133-
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
134-
return ipc::make_align(alignof(std::max_align_t),
135-
(((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align);
136-
}
137-
138145
chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
139146
return chunk_storages()[chunk_size].get_info(chunk_size);
140147
}
141148

142-
std::pair<ipc::storage_id_t, void*> apply_storage(std::size_t size) {
149+
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size) {
143150
std::size_t chunk_size = calc_chunk_size(size);
144151
auto info = chunk_storage_info(chunk_size);
145152
if (info == nullptr) return {};
@@ -166,14 +173,12 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) {
166173

167174
void release_storage(ipc::storage_id_t id, std::size_t size) {
168175
if (id < 0) {
169-
ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
176+
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
170177
return;
171178
}
172-
173179
std::size_t chunk_size = calc_chunk_size(size);
174180
auto info = chunk_storage_info(chunk_size);
175181
if (info == nullptr) return;
176-
177182
info->lock_.lock();
178183
info->pool_.release(id);
179184
info->lock_.unlock();
@@ -394,7 +399,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
394399
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
395400
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
396401
if (size > ipc::large_msg_limit) {
397-
auto dat = apply_storage(size);
402+
auto dat = acquire_storage(size);
398403
void * buf = dat.second;
399404
if (buf != nullptr) {
400405
std::memcpy(buf, data, size);
@@ -429,7 +434,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
429434
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
430435
if (!wait_for(info->wt_waiter_, [&] {
431436
return !que->push(
432-
recycle_message<typename queue_t::value_t>,
437+
[](void*) { return true; },
433438
info->cc_id_, msg_id, remain, data, size);
434439
}, tm)) {
435440
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
@@ -450,7 +455,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
450455
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
451456
if (!wait_for(info->wt_waiter_, [&] {
452457
return !que->push(
453-
recycle_message<typename queue_t::value_t>,
458+
[](void*) { return true; },
454459
info->cc_id_, msg_id, remain, data, size);
455460
}, tm)) {
456461
return false;
@@ -475,7 +480,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
475480
for (;;) {
476481
// pop a new message
477482
typename queue_t::value_t msg;
478-
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
483+
bool recycled = false;
484+
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] {
485+
return !que->pop(msg, [&recycled](bool r) { recycled = r; });
486+
}, tm)) {
479487
// pop failed, just return.
480488
return {};
481489
}
@@ -490,23 +498,29 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
490498
return {};
491499
}
492500
std::size_t msg_size = static_cast<std::size_t>(r_size);
501+
// large message
502+
if (msg.storage_) {
503+
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
504+
void* buf = find_storage(buf_id, msg_size);
505+
if (buf != nullptr) {
506+
if (recycled) {
507+
return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) {
508+
release_storage(reinterpret_cast<ipc::storage_id_t>(pmid) - 1, size);
509+
}, reinterpret_cast<void*>(buf_id + 1)};
510+
} else {
511+
return ipc::buff_t{buf, msg_size}; // no recycle
512+
}
513+
} else {
514+
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
515+
continue;
516+
}
517+
}
493518
// find cache with msg.id_
494519
auto cac_it = rc.find(msg.id_);
495520
if (cac_it == rc.end()) {
496521
if (msg_size <= ipc::data_length) {
497522
return make_cache(msg.data_, msg_size);
498523
}
499-
if (msg.storage_) {
500-
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
501-
void * buf = find_storage(buf_id, msg_size);
502-
if (buf != nullptr) {
503-
return ipc::buff_t{buf, msg_size};
504-
}
505-
else {
506-
ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
507-
continue;
508-
}
509-
}
510524
// gc
511525
if (rc.size() > 1024) {
512526
std::vector<msg_id_t> need_del;

src/libipc/circ/elem_array.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ class elem_array : public ipc::circ::conn_head<Policy> {
130130
return head_.force_push(que, std::forward<F>(f), block_);
131131
}
132132

133-
template <typename Q, typename F>
134-
bool pop(Q* que, cursor_t* cur, F&& f) {
133+
template <typename Q, typename F, typename R>
134+
bool pop(Q* que, cursor_t* cur, F&& f, R&& out) {
135135
if (cur == nullptr) return false;
136-
return head_.pop(que, *cur, std::forward<F>(f), block_);
136+
return head_.pop(que, *cur, std::forward<F>(f), std::forward<R>(out), block_);
137137
}
138138
};
139139

src/libipc/prod_cons.h

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,14 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
5858
return false;
5959
}
6060

61-
template <typename W, typename F, typename E>
62-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) {
61+
template <typename W, typename F, typename R, typename E>
62+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) {
6363
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
6464
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
6565
return false; // empty
6666
}
6767
std::forward<F>(f)(&(elems[cur_rd].data_));
68+
std::forward<R>(out)(true);
6869
rd_.fetch_add(1, std::memory_order_release);
6970
return true;
7071
}
@@ -80,8 +81,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
8081
return false;
8182
}
8283

83-
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
84-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
84+
template <typename W, typename F, typename R,
85+
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
86+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
8587
byte_t buff[DS];
8688
for (unsigned k = 0;;) {
8789
auto cur_rd = rd_.load(std::memory_order_relaxed);
@@ -92,6 +94,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
9294
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
9395
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
9496
std::forward<F>(f)(buff);
97+
std::forward<R>(out)(true);
9598
return true;
9699
}
97100
ipc::yield(k);
@@ -156,8 +159,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
156159
return false;
157160
}
158161

159-
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
160-
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
162+
template <typename W, typename F, typename R,
163+
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
164+
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
161165
byte_t buff[DS];
162166
for (unsigned k = 0;;) {
163167
auto cur_rd = rd_.load(std::memory_order_relaxed);
@@ -179,6 +183,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
179183
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
180184
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
181185
std::forward<F>(f)(buff);
186+
std::forward<R>(out)(true);
182187
return true;
183188
}
184189
ipc::yield(k);
@@ -263,20 +268,20 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
263268
return true;
264269
}
265270

266-
template <typename W, typename F, typename E>
267-
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) {
271+
template <typename W, typename F, typename R, typename E>
272+
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
268273
if (cur == cursor()) return false; // acquire
269274
auto* el = elems + circ::index_of(cur++);
270275
std::forward<F>(f)(&(el->data_));
271276
for (unsigned k = 0;;) {
272277
auto cur_rc = el->rc_.load(std::memory_order_acquire);
273-
circ::cc_t rem_cc = cur_rc & ep_mask;
274-
if (rem_cc == 0) {
278+
if ((cur_rc & ep_mask) == 0) {
279+
std::forward<R>(out)(true);
275280
return true;
276281
}
277-
if (el->rc_.compare_exchange_weak(cur_rc,
278-
cur_rc & ~static_cast<rc_t>(wrapper->connected_id()),
279-
std::memory_order_release)) {
282+
auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
283+
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
284+
std::forward<R>(out)((nxt_rc & ep_mask) == 0);
280285
return true;
281286
}
282287
ipc::yield(k);
@@ -395,8 +400,8 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
395400
return true;
396401
}
397402

398-
template <typename W, typename F, typename E, std::size_t N>
399-
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) {
403+
template <typename W, typename F, typename R, typename E, std::size_t N>
404+
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
400405
auto* el = elems + circ::index_of(cur);
401406
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
402407
if (cur_fl != ~static_cast<flag_t>(cur)) {
@@ -406,17 +411,18 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
406411
std::forward<F>(f)(&(el->data_));
407412
for (unsigned k = 0;;) {
408413
auto cur_rc = el->rc_.load(std::memory_order_acquire);
409-
circ::cc_t rem_cc = cur_rc & rc_mask;
410-
if (rem_cc == 0) {
414+
if ((cur_rc & rc_mask) == 0) {
415+
std::forward<R>(out)(true);
411416
el->f_ct_.store(cur + N - 1, std::memory_order_release);
412417
return true;
413418
}
414-
if ((rem_cc & ~wrapper->connected_id()) == 0) {
419+
auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
420+
bool last_one = false;
421+
if ((last_one = (nxt_rc & rc_mask) == 0)) {
415422
el->f_ct_.store(cur + N - 1, std::memory_order_release);
416423
}
417-
if (el->rc_.compare_exchange_weak(cur_rc,
418-
inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()),
419-
std::memory_order_release)) {
424+
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
425+
std::forward<R>(out)(last_one);
420426
return true;
421427
}
422428
ipc::yield(k);

src/libipc/queue.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,14 @@ class queue_base : public queue_conn {
171171
});
172172
}
173173

174-
template <typename T>
175-
bool pop(T& item) {
174+
template <typename T, typename F>
175+
bool pop(T& item, F&& out) {
176176
if (elems_ == nullptr) {
177177
return false;
178178
}
179179
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
180180
::new (&item) T(std::move(*static_cast<T*>(p)));
181-
});
181+
}, std::forward<F>(out));
182182
}
183183
};
184184

@@ -204,7 +204,12 @@ class queue final : public detail::queue_base<typename Policy::template elems_t<
204204
}
205205

206206
bool pop(T& item) {
207-
return base_t::pop(item);
207+
return base_t::pop(item, [](bool) {});
208+
}
209+
210+
template <typename F>
211+
bool pop(T& item, F&& out) {
212+
return base_t::pop(item, std::forward<F>(out));
208213
}
209214
};
210215

0 commit comments

Comments
 (0)