From 81efa6febe5689a9f6686c98f3b24f039d15fdd9 Mon Sep 17 00:00:00 2001 From: Robbert van der Helm Date: Mon, 26 Oct 2020 12:58:03 +0100 Subject: [PATCH] Listen for incoming secondary event requests --- src/common/communication.h | 126 +++++++++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 6 deletions(-) diff --git a/src/common/communication.h b/src/common/communication.h index 4c0dbae5..1e5e2529 100644 --- a/src/common/communication.h +++ b/src/common/communication.h @@ -16,6 +16,10 @@ #pragma once +#include +#include +#include + #include #include @@ -331,18 +335,80 @@ class EventHandler { * the next audio buffer gets processed. * * @param logging A pair containing a logger instance and whether or not - * this is for sending `dispatch()` events or host callbacks. Optional since - * it doesn't have to be done on both sides. + * this is for sending `dispatch()` events or host callbacks. Optional + * since it doesn't have to be done on both sides. * @param callback The function used to generate a response out of an event. * * @tparam F A function type in the form of `EventResponse(Event)`. * + * TODO: Add a boolean flag indicating that this is being called + * from an off thread + * * @relates EventHandler::send * @relates passthrough_event */ template void receive(std::optional> logging, F callback) { - // TODO: Listen for new incoming connections + assert(acceptor.has_value()); + + // As described above we'll handle incoming requests for `socket` on + // this thread. We'll also listen for incoming connections on `endpoint` + // on another thread. For any incoming connection we'll spawn a new + // thread to handle the request. When `socket` closes and this loop + // breaks, the listener and any still active threads will be cleaned up + // before this function exits. + boost::asio::io_context secondary_context{}; + + // This works the exact same was as `active_plugins` and + // `next_plugin_id` in `GroupBridge` + std::map active_secondary_requests{}; + std::atomic_size_t next_request_id{}; + std::mutex active_secondary_requests_mutex; + accept_requests( + *acceptor, logging, + [&](boost::asio::local::stream_protocol::socket secondary_socket) { + const size_t request_id = next_request_id.fetch_add(1); + + std::lock_guard lock(active_secondary_requests_mutex); + active_secondary_requests[request_id] = + std::jthread([&, request_id]() { + // TODO: Factor this out + auto event = read_object(secondary_socket); + if (logging) { + auto [logger, is_dispatch] = *logging; + logger.log_event(is_dispatch, event.opcode, + event.index, event.value, + event.payload, event.option, + event.value_payload); + } + + EventResult response = callback(event); + if (logging) { + auto [logger, is_dispatch] = *logging; + logger.log_event_response(is_dispatch, event.opcode, + response.return_value, + response.payload, + response.value_payload); + } + + write_object(secondary_socket, response); + + // When we have processed this request, we'll join the + // thread again with the thread that's handling + // `secondary_context`. + boost::asio::post(secondary_context, [&, request_id]() { + std::lock_guard lock( + active_secondary_requests_mutex); + + // The join is implicit because we're using + // std::jthread + active_secondary_requests.erase(request_id); + }); + }); + }); + + std::jthread secondary_requests_handler( + [&]() { secondary_context.run(); }); while (true) { try { @@ -369,9 +435,57 @@ class EventHandler { break; } } + + // When the socket gets terminated we'll stop accepting new connections + // and thus terminate `secondary_requests_handler`. Dropping + // `active_secondary_requests` will wait for all threads to join. + secondary_context.stop(); } private: + /** + * Used in `receive()` to asynchronously listen for secondary socket + * connections. After `callback()` returns this function will continue to be + * called until the IO context gets stopped. + * + * @param acceptor The acceptor we will be listening on. + * @param logging A pair containing a logger instance and whether or not + * this is for sending `dispatch()` events or host callbacks. Optional + * since it doesn't have to be done on both sides. + * @param callback A function that handles the new socket connection. + * + * @tparam F A function in the form + * `void(boost::asio::local::stream_protocol::socket)` to handle a new + * incoming connection. + */ + template + void accept_requests( + boost::asio::local::stream_protocol::acceptor& acceptor, + std::optional> logging, + F callback) { + acceptor.async_accept( + [&](const boost::system::error_code& error, + boost::asio::local::stream_protocol::socket secondary_socket) { + // + if (error.failed()) { + if (logging) { + auto [logger, is_dispatch] = *logging; + logger.log("Failure while accepting connections: " + + error.message()); + } else { + std::cerr << "Failure while accepting connections: " + << error.message() << std::endl; + } + + return; + } + + callback(std::move(secondary_socket)); + + accept_requests(acceptor, logging, callback); + }); + } + /** * The main IO context. New sockets created during `send()` will be bound to * this context. In `receive()` we'll create a new IO context since we want @@ -384,9 +498,9 @@ class EventHandler { /** * This acceptor will be used once synchronously on the listening side - * during `Sockets::connect()`. When `EventHandler::receive()` is called - * this will be replaced by a new acceptor bound to a new IO context to - * receive any additional incoming connections. + * during `Sockets::connect()`. When `EventHandler::receive()` is then + * called, we'll asynchronously listen for new incoming socket connections + * on `endpoint` using this same acceptor. */ std::optional acceptor;