Listen for incoming secondary event requests

This commit is contained in:
Robbert van der Helm
2020-10-26 12:58:03 +01:00
parent 523ac64d11
commit 81efa6febe
+120 -6
View File
@@ -16,6 +16,10 @@
#pragma once
#include <atomic>
#include <iostream>
#include <thread>
#include <bitsery/adapter/buffer.h>
#include <bitsery/bitsery.h>
@@ -331,18 +335,80 @@ class EventHandler {
* the next audio buffer gets processed.
*
* @param logging A pair containing a logger instance and whether or not
* this is for sending `dispatch()` events or host callbacks. Optional since
* it doesn't have to be done on both sides.
* this is for sending `dispatch()` events or host callbacks. Optional
* since it doesn't have to be done on both sides.
* @param callback The function used to generate a response out of an event.
*
* @tparam F A function type in the form of `EventResponse(Event)`.
*
* TODO: Add a boolean flag indicating that this is being called
* from an off thread
*
* @relates EventHandler::send
* @relates passthrough_event
*/
template <typename F>
void receive(std::optional<std::pair<Logger&, bool>> logging, F callback) {
// TODO: Listen for new incoming connections
assert(acceptor.has_value());
// As described above we'll handle incoming requests for `socket` on
// this thread. We'll also listen for incoming connections on `endpoint`
// on another thread. For any incoming connection we'll spawn a new
// thread to handle the request. When `socket` closes and this loop
// breaks, the listener and any still active threads will be cleaned up
// before this function exits.
boost::asio::io_context secondary_context{};
// This works the exact same was as `active_plugins` and
// `next_plugin_id` in `GroupBridge`
std::map<size_t, std::jthread> active_secondary_requests{};
std::atomic_size_t next_request_id{};
std::mutex active_secondary_requests_mutex;
accept_requests(
*acceptor, logging,
[&](boost::asio::local::stream_protocol::socket secondary_socket) {
const size_t request_id = next_request_id.fetch_add(1);
std::lock_guard lock(active_secondary_requests_mutex);
active_secondary_requests[request_id] =
std::jthread([&, request_id]() {
// TODO: Factor this out
auto event = read_object<Event>(secondary_socket);
if (logging) {
auto [logger, is_dispatch] = *logging;
logger.log_event(is_dispatch, event.opcode,
event.index, event.value,
event.payload, event.option,
event.value_payload);
}
EventResult response = callback(event);
if (logging) {
auto [logger, is_dispatch] = *logging;
logger.log_event_response(is_dispatch, event.opcode,
response.return_value,
response.payload,
response.value_payload);
}
write_object(secondary_socket, response);
// When we have processed this request, we'll join the
// thread again with the thread that's handling
// `secondary_context`.
boost::asio::post(secondary_context, [&, request_id]() {
std::lock_guard lock(
active_secondary_requests_mutex);
// The join is implicit because we're using
// std::jthread
active_secondary_requests.erase(request_id);
});
});
});
std::jthread secondary_requests_handler(
[&]() { secondary_context.run(); });
while (true) {
try {
@@ -369,9 +435,57 @@ class EventHandler {
break;
}
}
// When the socket gets terminated we'll stop accepting new connections
// and thus terminate `secondary_requests_handler`. Dropping
// `active_secondary_requests` will wait for all threads to join.
secondary_context.stop();
}
private:
/**
* Used in `receive()` to asynchronously listen for secondary socket
* connections. After `callback()` returns this function will continue to be
* called until the IO context gets stopped.
*
* @param acceptor The acceptor we will be listening on.
* @param logging A pair containing a logger instance and whether or not
* this is for sending `dispatch()` events or host callbacks. Optional
* since it doesn't have to be done on both sides.
* @param callback A function that handles the new socket connection.
*
* @tparam F A function in the form
* `void(boost::asio::local::stream_protocol::socket)` to handle a new
* incoming connection.
*/
template <typename F>
void accept_requests(
boost::asio::local::stream_protocol::acceptor& acceptor,
std::optional<std::pair<Logger&, bool>> logging,
F callback) {
acceptor.async_accept(
[&](const boost::system::error_code& error,
boost::asio::local::stream_protocol::socket secondary_socket) {
//
if (error.failed()) {
if (logging) {
auto [logger, is_dispatch] = *logging;
logger.log("Failure while accepting connections: " +
error.message());
} else {
std::cerr << "Failure while accepting connections: "
<< error.message() << std::endl;
}
return;
}
callback(std::move(secondary_socket));
accept_requests(acceptor, logging, callback);
});
}
/**
* The main IO context. New sockets created during `send()` will be bound to
* this context. In `receive()` we'll create a new IO context since we want
@@ -384,9 +498,9 @@ class EventHandler {
/**
* This acceptor will be used once synchronously on the listening side
* during `Sockets::connect()`. When `EventHandler::receive()` is called
* this will be replaced by a new acceptor bound to a new IO context to
* receive any additional incoming connections.
* during `Sockets::connect()`. When `EventHandler::receive()` is then
* called, we'll asynchronously listen for new incoming socket connections
* on `endpoint` using this same acceptor.
*/
std::optional<boost::asio::local::stream_protocol::acceptor> acceptor;