coroutine types:
coroutine synchronization primitives:
algorithms:
executors:
any_executor_refjump_ontask_queueworkerstrandthread_poolnoop_executorthis_thread_executornew_thread_executor
customizing memory allocation:
task is a lazy function. Returns one value (or rethrows exception). Lazy == starts when co_await or when start_and_detach() called.
interface:
// movable, default constructible type
// Result - what will be returned from coroutine
// Ctx - for advanced usage and customization, see below
template <typename Result, typename Ctx = null_context>
struct task {
// precondition: !empty()
// co_await returns Result&& or rethrows exception
auto operator co_await() &;
// precondition: !empty()
// co_await returns Result or rethrows exception
auto operator co_await() &&;
// precondition: !empty()
// co_awaits until task is done, but dont gets value / rethrows exception
auto wait() noexcept;
bool empty() const noexcept;
explicit operator bool() const noexcept { return !empty(); }
// precondition: empty() || not started yet
// postcondition: empty(), task result ignored (exception too)
// returns released task handle
// if 'stop_at_end' is false, then task will delete itself at end,
// otherwise handle.destroy() should be called
handle_type start_and_detach(bool stop_at_end = false);
// precondition: not started yet
// postcondiion: empty()
// example: thread_pool.schedule(mytask().detach())
handle_type detach() noexcept;
Ctx* get_context() const noexcept;
};about task contexts (advanced usage)
// Ctx may be used as second `task` argument for customization
// e.g. may be used to make a custom traces and `call graphs` of coroutines
// or customizing how unhandled exception handled
struct null_context {
// invoked when task is scheduled to execute, 'owner' is handle of coroutine, which awaits task
// never invoked with nullptrs
// note: one task may have only one owner, but one owner can have many child tasks (when_all/when_any)
template <typename OwnerPromise, typename P>
static void on_owner_setted(std::coroutine_handle<OwnerPromise>, std::coroutine_handle<P>) noexcept {}
// may be invoked even without 'on_owner_setted' before, if task is detached
template <typename P>
static void on_start(std::coroutine_handle<P>) noexcept {}
// invoked when task ended with/without exception
template <typename P>
static void on_end(std::coroutine_handle<P>) noexcept {}
// invoked when no one waits for task and exception thrown
// precondition: std::current_exception() != nullptr
// Note: if exception thrown memory leak possible (no one will destroy handle)
// this function MUST NOT destroy handle (UB) and handle must not be destroyed while exception alive
// Note: passed coroutine handle will became invalid after this call
template <typename P>
static void on_ignored_exception(std::coroutine_handle<P>) noexcept {}
};
example with creating and using task:
dd::task<load_response> load_from_database(database_ptr client, user_id id) {
request req = make_load_req(id);
response rsp = co_await client->send_request(req);
co_return parse_load_rsp(rsp);
}
dd::task<bool> some_foo() {
..
load_response rsp = co_await send_into_database(client, info);
co_return rsp.ok().
}
// or same with `wait`
dd::task<bool> some_foo() {
..
dd::task t = send_into_database(client, info);
co_await t.wait();
// all promise not special methods are public interface too
load_response rsp = t.raw_handle().promise().result_or_rethrow();
co_return rsp.ok().
}
interface:
template <yieldable Yield>
struct generator {
// * if .empty(), then begin() == end()
// * produces next value(often first)
// iterator invalidated only when generator dies
iterator begin() & [[clang::lifetimebound]];
static std::default_sentinel_t end() noexcept;
// postcondition: empty(), 'for' loop produces 0 values
constexpr generator() noexcept = default;
// postconditions:
// * other.empty()
// * iterators to 'other' == end()
constexpr generator(generator&& other) noexcept;
constexpr generator& operator=(generator&& other) noexcept;
bool operator==(const generator& other) const noexcept;
bool empty() const noexcept;
explicit operator bool() const noexcept {
return !empty();
}
void reset(handle_type handle) noexcept;
// postcondition: .empty()
void clear() noexcept;
// postcondition: .empty()
// its caller responsibility to correctly destroy handle
handle_type release() noexcept;
};- produces next/first value when .begin called
- recursive (see co_yield dd::elements_of(rng))
- default constructed generator is an empty range
notes:
- tag
dd::elements_of(RNG)accepts range(including other generator) and yields all elements of it more effectively, then for-loop - tag
by_refallows to yield lvalue referencnes, so caller may get reference from iterator's operator* and change value, so generator will observe changes. This allows usage generator as in-place output range, which may be very cose
usage as output iterator example:
dd::generator<int> printer() {
int i;
while (true) {
co_yield dd::by_ref{i};
print(i);
}
}
void print_foo(std::vector<int> values) {
// prints all values
std::copy(begin(values), end(values), printer().begin().out());
}- operator* of iterator returns rvalue reference to value, so
for(std::string s : generator())is effective code(one move from generator)
example:
dd::generator<int> ints() {
for (int i = 0; i < 100; ++i)
co_yield i;
}
dd::generator<int> intsints() {
co_yield dd::elements_of(ints());
co_yield dd::elements_of(ints());
}
void use() {
for (int i : intsints())
do_smth(i);
}interface: Literaly same as dd::generator. But 'begin' and operator++ on iterator requires co_await. So, there are macro co_foreach for easy usage.
example:
dd::channel<int> ints() {
co_yield 1;
co_yield 2;
}
dd::channel<int> ints_creator() {
for (int i = 0; i < 100; ++i) {
int value = co_await foo(); // any hard working for calculating `value`
co_yield value; // control returns to caller ONLY after co_yield!
co_yield dd::elements_of(ints()); // elements_of works too
}
}
dd::task<void> user() {
co_foreach(int i, ints_creator())
use(i);
}
// if you want to not use macro, then:
// auto c = ints_creator();
// for (auto b = co_await c.begin(); b != c.end(); co_await ++b)
// use(*b);
coroutine for detached work, starts immediately (not lazy), similar to task<void> with always start_and_detach(). Calls std::terminate on unhandled exceptions
mostly used for low-level implementations
example:
dd::job periodic_task() {
mytimer t;
for (;;) {
errc e = co_await t.sleep(5s);
if (e == canceled)
co_return;
do_foo();
}
}
async_task is similar to std::future, produces value or rethrows exception, allows to blocking wait it even if co_return called in other thread
Execution starts immediately when async_task created.
Result can be ignored, it is safe.
Mostly used when its required to blocking wait a task on other thread
interface:
template <typename Result>
struct async_task {
async_task() noexcept = default;
async_task(async_task&& other) noexcept;
// postcondition: if !empty(), then coroutine suspended and value produced
void wait() const noexcept;
// returns true if 'get' is callable and will return immedially without wait
bool ready() const noexcept;
// postcondition: empty()
void detach() noexcept;
// precondition: !empty()
// may be invoked in different thread, but only in one
std::add_rvalue_reference_t<Result> get() &;
// precondition: !empty()
// may be invoked in different thread, but only in one
Result get() &&;
bool empty() const noexcept;
};example:
dd::async_task<int> future_int() {
co_await dd::jump_on(my_executor);
auto x = co_await foo();
co_return bar(x);
}
..
void foo() {
int x = future_int().get(); // blocking wait
}interface(similar to std::jthread):
struct logical_thread {
logical_thread() noexcept = default;
logical_thread(logical_thread&&) noexcept;
bool joinable();
void join(); // block until coroutine is done
void detach();
bool stop_possible() const noexcept;
bool request_stop();
// co_await dd::this_coro::stop_token will return dd::stop_token for coroutine
};It is cancellable coroutine, which behavior similar to https://en.cppreference.com/w/cpp/thread/jthread (can be .request_stop(), automatically requested for stop and joined in destructor or when move assigned etc)
Execution starts immediately when logical_thread created. If unhandled exception happens in logical_thread std::terminate is called
example:
dd::logical_thread Bar() {
// imagine that already C++47 and networking in the standard
auto socket = co_await async_connect(endpoint);
auto token = co_await dd::this_coro::stop_token; // cancellable coroutine can get stop token associated with it
while (!token.stop_requested()) {
auto write_info = co_await socket.async_write("Hello world");
auto read_result = co_await socket.async_read();
std::cout << read_result;
}
}interface:
road may be closed or opened. if closed, coroutines may wait until its open. Must be used in one thread
struct road {
// precondition: not closed (use wait_open in loop...)
void close() noexcept;
[[nodiscard]] bool closed() const noexcept;
// notifies all waiters, that gateway is opened
void open(executor auto& e);
// co_await returns `bool` == !this->closed()
// Note: resuming waiters in `open` may lead to .close again
// this means `wait_open` often need to use in loop.
// Example:
// while (!co_await r.wait_open())
// [[unlikely]];
open_awaiter wait_open() noexcept;
};example:
// blocks all writes for 5s
dd::task<void> close_for_5s(road& write_road) {
// avoid other parallely working close_for_5s, even in single thread!
while (!co_await r.wait_open())
[[unlikely]];
write_road.close();
co_await my_sleep_for(5s);
write_road.open(dd::this_thread_executor);
}
..
dd::task<void> writer(road& write_road) {
// loop for avoiding use closed road if `open` in close_for_5s resumes someone who close road again
while (!co_await r.wait_open())
[[unlikely]];
do_write();
}gate tracks the number of running operations and may be used to forbid new operations and await until all operations done. Must be used in one thread
interface:
// throwed in `enter` called on closed `gate`
struct gate_closed_exception : std::exception {};
// Note: all methods should be called from one thread
struct gate {
// if gate is closed, returns false.
// Otherwise returns true and caller must call 'leave' in future
bool try_enter() noexcept;
// throws `gate_closed_exception` if closed
void enter();
// must be invoked only after invoking 'try_enter'
// Note: may resume close awaiters.
// Sometimes it makes sense to push them into task list instead of executing now
// e.g. someone who call `leave` uses resources, which they will destroy
void leave() noexcept;
struct holder;
// combines enter + leave on scope exit
holder hold();
// now many successfull 'try_enter' calls not finished by 'leave' now
size_t active_count() const noexcept;
// postcondition (after co_await): is_closed() == true && `active_count()` == 0
// Note: is_closed() true even before co_await
// Note: several coroutines may wait `close`. All close awaiters will be resumed.
// reopen() will allow calling 'close' again
gate_close_awaiter close() noexcept;
bool is_closed() const noexcept;
// after `reopen` gate may be `entered` or `closed` again
// precondition: .active_count() == 0 && no one waits for `close`
void reopen() noexcept;
};
same as std::latch, but for coroutines
interface:
// non movable
struct latch {
// precondition: count >= 0 and <= max,
// `e` used when counter reaches 0 for resuming all who waits
constexpr explicit latch(ptrdiff_t count, any_executor_ref e) noexcept;
static constexpr ptrdiff_t max() noexcept;
// decrements the internal counter by n without blocking the caller
// precondition: n >= 0 && n <= internal counter
void count_down(std::ptrdiff_t n = 1) noexcept;
// returns true if the internal counter has reached zero
// never blocks, 'try_wait' in std::latch
bool ready() const noexcept;
// suspends the calling coroutine until the internal counter reaches ​0​.
// If it is zero already, returns immediately
co_awaiter auto wait() noexcept;
// precondition: n >= 0 && n <= internal counter
// logical equivalent to count_down(n); wait() (but atomicaly, really count down + wait is rata race)
co_awaiter auto arrive_and_wait(std::ptrdiff_t n = 1) noexcept;
any_executor_ref get_executor() const noexcept;
};Waits until all tasks are done
interface:
template <typename... Ts, typename Ctx>
auto when_all(task<Ts, Ctx>... tasks) -> task<std::tuple<expected<Ts, std::exception_ptr>...>, Ctx>;
template <typename T, typename Ctx>
auto when_all(std::vector<task<T, Ctx>> tasks) -> task<std::vector<expected<T, std::exception_ptr>>, Ctx>;
Waits and returns value and index of first not failed task. If all tasks failed (exception), last exception returned.
Note: all other tasks do not canceled after first is done. Its caller responsibility to not resources with such tasks
interface:
// precondition: all tasks not .empty()
// returns first not failed or last exception if all failed
template <typename... Ts, typename... Ctx>
auto when_any(task<Ts, Ctx>... tasks) -> task<std::variant<expected<Ts, std::exception_ptr>...>, first_type_t<Ctx...>>;
template <typename T>
struct when_any_dyn_result {
expected<T, std::exception_ptr> value;
size_t index() const noexcept;
};
// precondition: all tasks not .empty()
// returns value and index of first not failed or last exception if all failed
template <typename T, typename Ctx>
auto when_any(std::vector<task<T, Ctx>> tasks) -> task<when_any_dyn_result<T>, Ctx>;example:
dd::task<void> foo1();
dd::task<int> foo2();
dd::task<int> bar() {
std::variant r = co_await dd::when_any(foo1(), foo2());
switch (r.index()) {
case 0:
// foo1() done first
break;
case 1:
// foo2 done first
break;
}
}
with binds arguments to a task so that they will be destroyed after the task is executed
interface:
// precondition: !t.empty()
template <typename T, typename Ctx>
task<T, Ctx> with(task<T, Ctx> t, auto...) { co_return co_await t; }example:
dd::task<void> foo(data_t*);
..
std::shared_ptr data = std::make_shared<data_t>();
dd::with(foo(data.get()), data).start_and_detach();chain is a then in more useful interface, co_awaits awaitables one by one in a sequence passing result of previous into next (if next is function), stops on first exception
Note: if value both co_awaitable and invocable - compilation error (ambigious call)
interface:
// set of overloads, but for understanding this interface present
// starts always with co_awaitable, then continues with co_awaitable
// OR function, which accepts result of previous co_await and returns co_awaitable
template <typename Ctx = null_context, co_awaitable A, typename Head, typename... Tail>
auto chain(A t, Head foo, Tail... tail) -> task<...> (type deducted)
// helper, ignores result of previous co_await in `chain`
constexpr inline ignore_result_t ignore_result = {};
example:
dd::task<int> foo(std::string);
dd::task<std::string> bar();
..
int x = co_await dd::chain(bar(), [] (std::string s) { return foo(std::move(s)); });
..example with ignore_result:
dd::task<response> handle_request(request);
dd::task<bool> validate_rsp(response);
dd::task<void> do_complex_handling(request r) {
// function guarantees, that validator will be created only after `handle_request`, not now
auto make_validator = [](response r) { return validate_rsp(std::move(r)); };
auto assert_schedule_status = [](dd::schedule_status s) {
assert(!s);
return std::suspend_never{};
};
return dd::chain(dd::jump_on(executor), assert_schedule_status, handle_request(std::move(r)), make_validator,
dd::ignore_result);
}
kelcoro defines concept executor and struct task_node.
Basically executor is any type, which supports .attach(task_node*).
This interface greatly simplifies implementation and improves performance, removing allocations and type erasure
executor and task node definitions
// task node is a intrusive node for implementation of executors, it may be used in user defined awaiters / executors
struct task_node {
task_node* next = nullptr;
std::coroutine_handle<> task = nullptr;
schedule_errc status = schedule_errc::ok;
};
template <typename T>
concept executor = requires(T& exe, task_node* node) {
// preconditions for .attach:
// node && node->task
// node live until node->task.resume() and will be invalidated immediately after that
exe.attach(node);
// effect: schedules node->task to be executed on 'exe'
};
// ADL customization point, works for all kelcoro executors
// may be overloaded for your executor type, should return awaitable which
// schedules execution of coroutine to 'e'
template <executor E>
auto jump_on(E&& e) noexcept;
// specialization for dd::thread_pool
// Note: selects thread where task will be executed based on operation hash.
// this guarantees, that if same coroutine jumps on same thread pool it will always run on same thread
auto jump_on(thread_pool& tp) noexcept;
type-erased reference to executor. Also satisfies concept executor
interface:
struct any_executor_ref {
template <executor E>
constexpr any_executor_ref(E& exe KELCORO_LIFETIMEBOUND)
requires(!std::same_as<std::remove_volatile_t<E>, any_executor_ref> && !std::is_const_v<E>);
any_executor_ref(const any_executor_ref&) = default;
any_executor_ref(any_executor_ref&&) = default;
// executor interface
void attach(task_node* node);
};thread-safe queue for tasks. Note, that it is also satisfies executor concept, but not executes tasks (someone can execute them later)
interface:
struct task_queue {
// precondition: node != nullptr && node is not contained in queue
void push(task_node* node);
// attach a whole linked list
void push_list(task_node* first, task_node* last);
// finds last
void push_list(task_node* first);
task_node* pop_all();
// blocking, waits until atleast one task pushed
// postcondition: task_node != nullptr
task_node* pop_all_not_empty();
// executor interface
void attach(task_node* node) noexcept;
};executor, basicaly task_node + std::thread
executor, basically just reference to task_node of concrete worker.
Guarantees, that all pushed tasks will be executed on one thread in push order
interface:
struct strand {
explicit strand(worker&);
explicit strand(task_queue&);
task_queue& get_queue() const noexcept;
// executor interface
void attach(task_node* node) const noexcept;
};executor, not stealing, manages many workers
interface:
// distributes tasks among workers
// co_await jump_on(pool) schedules coroutine to thread pool
// note: when thread pool dies, all pending tasks invoked with schedule_errc::cancelled
struct thread_pool {
static size_t default_thread_count();
explicit thread_pool(size_t thread_count = default_thread_count(), worker::job_t job = default_worker_job,
std::pmr::memory_resource& r = *std::pmr::get_default_resource());
thread_pool(thread_pool&&) = delete;
void operator=(thread_pool&&) = delete;
// executor interface. used by dd::jump_on(pool) internally
void attach(task_node* node) noexcept;
bool is_worker(std::thread::id id = std::this_thread::get_id());
// same as dd::schedule_to(pool), but uses pool memory resource to allocate tasks
void schedule(std::invocable auto&& foo, operation_hash_t hash);
void schedule(std::invocable auto&& foo);
std::span<std::thread> workers_range() noexcept;
std::span<const std::thread> workers_range() const noexcept;
std::span<task_queue> queues_range() noexcept;
std::span<const task_queue> queues_range() const noexcept;
// see `strand` doc
strand get_strand(operation_hash_t op_hash);
task_queue& select_queue(operation_hash_t op_hash) noexcept;
// can be called as many times as you want from any threads
void request_stop();
// NOTE: can't be called from workers
// NOTE: can't be called more than once
// Wait for the job to complete (after calling `request_stop`)
void wait_stop() &&;
};
executor, does nothing for each .attach(task_node*)
executor. .attach(task_node*) invokes task in this thread
executor. each .attach(task_node*) creates new std::thread which will execute task. Mostly used for tests etc
kelcoro uses memory_resource concept instead of allocators for coroutines. Requirements for memory resources less than for allocators
concept memory resource
template <typename T>
concept memory_resource = !std::is_reference_v<T> && requires(T value, size_t sz, void* ptr) {
// align of result must be atleast aligned as dd::coroframe_align()
// align arg do not required, because standard do not provide interface
// for passing alignment to promise_type::new
{ value.allocate(sz) } -> std::convertible_to<void*>;
// if type is trivially destructible it must handle case when `value` lays in memory under `ptr`
{ value.deallocate(ptr, sz) } -> std::same_as<void>;
requires std::is_nothrow_move_constructible_v<T>;
requires alignof(T) <= alignof(std::max_align_t);
requires !(std::is_empty_v<T> && !std::default_initializable<T>);
};
All kelcoro coroutines detect memory resource from tag with_resource passed as last argument into coroutine.
If you want to use another resource always without changing signature of function, use *_r versions of coroutines, e.g. dd::task_r<T, YourResource>. In this case if YourResource is default constructible it will be used when coroutine frame allocated.
mostly used as tag in combination with chunk_from
reference to resource for cozy usage, mostly used in combination with with_resource
example:
using with_my_resource = dd::with_resource<dd::chunk_from<MyResource>>;
dd::task<T> foo(int arg, with_my_resource = get_my_resource());
dd::generator<int> gen(dd::with_stack_resource r) {
co_yield 5;
}
void bar() {
dd::stack_resource r;
// `gen` will use `r` for allocation
for (int x : gen(r))
use(x);
}
memory resource, implements the most efficient allocation strategy possible for typical case when coroutine waits until another coroutine is done.
.deallocate(ptr) assumes, that ptr is last allocated pointer, like on real stack - only last chunk may be deallocated.
This makes possible to effectively allocate memory and reuse it
interface:
/*
assumes, that only last allocated chunk may be deallocated (like real stack)
its common case for coroutines, but this resource must be used carefully
to preserve order of deallocations
all allocated pointers aligned to coroframe_align()
*/
struct stack_resource {
explicit stack_resource(std::span<byte_t> bytes = {});
stack_resource(stack_resource&& other) noexcept;
stack_resource& operator=(stack_resource&& other) noexcept;
void* allocate(size_t len);
void deallocate(void* ptr, size_t len) noexcept;
// works as 'deallocate(ptr, size)' for all allocated chunks
void reuse_memory() noexcept;
};
example:
dd::task<int> foo(dd::with_stack_resource r) {
co_await bar(r); // pass and wait to preseve allocations order
}default memory resource, uses standard new / delete for allocations
dd::pmr::polymorphic_resource is a small wrapper over std::memory_resource&, which adapts it for using with coroutines
kelcoro has no external dependencies and can be built with CMake easily:
Prefered way with CPM:
CPMAddPackage(
NAME KELCORO
GIT_REPOSITORY https://github.com/kelbon/kelcoro
GIT_TAG v1.4.1
)
target_link_libraries(MyTargetName kelcorolib)or just fetch content
include(FetchContent)
FetchContent_Declare(
kelcoro
GIT_REPOSITORY https://github.com/kelbon/kelcoro
GIT_TAG v1.4.1
)
FetchContent_MakeAvailable(kelcoro)
target_link_libraries(MyTargetName kelcorolib)