diff --git a/src/common/communication/common.h b/src/common/communication/common.h index 6d7055a7..08349eb2 100644 --- a/src/common/communication/common.h +++ b/src/common/communication/common.h @@ -488,25 +488,26 @@ class AdHocSocketHandler { * socket. This is either the primary `socket`, or a new ad hock socket if * this function is currently being called from another thread. * + * @tparam T The return value of F. * @tparam F A function in the form of - * `void(boost::asio::local::stream_protocol::socket&)`. + * `T(boost::asio::local::stream_protocol::socket&)`. */ - template - void send(F callback) { + template + T send(F callback) { // XXX: Maybe at some point we should benchmark how often this // ad hoc socket spawning mechanism gets used. If some hosts // for instance consistently and repeatedly trigger this then // we might be able to do some optimizations there. std::unique_lock lock(write_mutex, std::try_to_lock); if (lock.owns_lock()) { - callback(socket); + return callback(socket); } else { try { boost::asio::local::stream_protocol::socket secondary_socket( io_context); secondary_socket.connect(endpoint); - callback(secondary_socket); + return callback(secondary_socket); } catch (const boost::system::system_error&) { // So, what do we do when noone is listening on the endpoint // yet? This can happen with plugin groups when the Wine host @@ -517,7 +518,7 @@ class AdHocSocketHandler { // long living primary socket please let me know. std::lock_guard lock(write_mutex); - callback(socket); + return callback(socket); } } } diff --git a/src/common/communication/vst2.h b/src/common/communication/vst2.h index bdf9f934..da713612 100644 --- a/src/common/communication/vst2.h +++ b/src/common/communication/vst2.h @@ -77,10 +77,8 @@ class DefaultDataConverter { }; /** - * So, this is a bit of a mess. The TL;DR is that we want to use a single long - * living socket connection for `dispatch()` and another one for `audioMaster()` - * for performance reasons, but when the socket is already being written to we - * create new connections on demand. + * An instance of `AdHocSocketHandler` that can handle VST2 `dispatcher()` and + * `audioMaster()` events. * * For most of our sockets we can just send out our messages on the writing * side, and do a simple blocking loop on the reading side. The `dispatch()` and @@ -103,14 +101,11 @@ class DefaultDataConverter { * sets up asynchronous listeners for the socket endpoint, and then block and * handle events until the main socket is closed. * - * TODO: Factor out the on-demand socket spawning and handling logic so we can - * reuse most of this for the VST3 implementation - * * @tparam Thread The thread implementation to use. On the Linux side this * should be `std::jthread` and on the Wine side this should be `Win32Thread`. */ template -class EventHandler { +class EventHandler : public AdHocSocketHandler { public: /** * Sets up a single main socket for this type of events. The sockets won't @@ -129,45 +124,7 @@ class EventHandler { EventHandler(boost::asio::io_context& io_context, boost::asio::local::stream_protocol::endpoint endpoint, bool listen) - : io_context(io_context), endpoint(endpoint), socket(io_context) { - if (listen) { - boost::filesystem::create_directories( - boost::filesystem::path(endpoint.path()).parent_path()); - acceptor.emplace(io_context, endpoint); - } - } - - /** - * Depending on the value of the `listen` argument passed to the - * constructor, either accept connections made to the sockets on the Linux - * side or connect to the sockets on the Wine side - */ - void connect() { - if (acceptor) { - acceptor->accept(socket); - - // As mentioned in `acceptor's` docstring, this acceptor will be - // recreated in `receive_events()` on another context, and - // potentially on the other side of the connection in the case of - // `vst_host_callback` - acceptor.reset(); - boost::filesystem::remove(endpoint.path()); - } else { - socket.connect(endpoint); - } - } - - /** - * Close the socket. Both sides that are actively listening will be thrown a - * `boost::system_error` when this happens. - */ - void close() { - // The shutdown can fail when the socket is already closed - boost::system::error_code err; - socket.shutdown( - boost::asio::local::stream_protocol::socket::shutdown_both, err); - socket.close(); - } + : AdHocSocketHandler(io_context, endpoint, listen) {} /** * Serialize and send an event over a socket. This is used for both the host @@ -223,45 +180,14 @@ class EventHandler { .value_payload = value_payload}; // A socket only handles a single request at a time as to prevent - // messages from arriving out of order. For throughput reasons we prefer - // to do most communication over a single main socket (`socket`), and - // we'll lock `write_mutex` while doing so. In the event that the mutex - // is already locked and thus the main socket is currently in use by - // another thread, then we'll spawn a new socket to handle the request. - EventResult response; - { - // XXX: Maybe at some point we should benchmark how often this - // ad hoc socket spawning mechanism gets used. If some hosts - // for instance consistently and repeatedly trigger this then - // we might be able to do some optimizations there. - std::unique_lock lock(write_mutex, std::try_to_lock); - if (lock.owns_lock()) { + // messages from arriving out of order. `AdHocSocketHandler::send()` + // will either use a long-living primary socket, or if that's currently + // in use it will spawn a new socket for us. + EventResult response = this->template send( + [&](boost::asio::local::stream_protocol::socket& socket) { write_object(socket, event); - response = read_object(socket); - } else { - try { - boost::asio::local::stream_protocol::socket - secondary_socket(io_context); - secondary_socket.connect(endpoint); - - write_object(secondary_socket, event); - response = read_object(secondary_socket); - } catch (const boost::system::system_error&) { - // So, what do we do when noone is listening on the endpoint - // yet? This can happen with plugin groups when the Wine - // host process does an `audioMaster()` call before the - // plugin is listening. If that happens we'll fall back to a - // synchronous request. This is not very pretty, so if - // anyone can think of a better way to structure all of this - // while still mainting a long living primary socket please - // let me know. - std::lock_guard lock(write_mutex); - - write_object(socket, event); - response = read_object(socket); - } - } - } + return read_object(socket); + }); if (logging) { auto [logger, is_dispatch] = *logging; @@ -278,7 +204,7 @@ class EventHandler { /** * Spawn a new thread to listen for extra connections to `endpoint`, and - * then a blocking loop that handles events from the primary `socket`. + * then start a blocking loop that handles events from the primary `socket`. * * The specified function will be used to create an `EventResult` from an * `Event`. This is almost uses `passthrough_event()`, which converts a @@ -286,10 +212,6 @@ class EventHandler { * `audioMaster()` depending on the context, and then serializes the result * back into an `EventResultPayload`. * - * This function will also be used separately for receiving MIDI data, as - * some plugins will need pointers to received MIDI data to stay alive until - * the next audio buffer gets processed. - * * @param logging A pair containing a logger instance and whether or not * this is for sending `dispatch()` events or host callbacks. Optional * since it doesn't have to be done on both sides. @@ -330,143 +252,15 @@ class EventHandler { write_object(socket, response); }; - // As described above we'll handle incoming requests for `socket` on - // this thread. We'll also listen for incoming connections on `endpoint` - // on another thread. For any incoming connection we'll spawn a new - // thread to handle the request. When `socket` closes and this loop - // breaks, the listener and any still active threads will be cleaned up - // before this function exits. - boost::asio::io_context secondary_context{}; - - // The previous acceptor has already been shut down by - // `EventHandler::connect()` - acceptor.emplace(secondary_context, endpoint); - - // This works the exact same was as `active_plugins` and - // `next_plugin_id` in `GroupBridge` - std::map active_secondary_requests{}; - std::atomic_size_t next_request_id{}; - std::mutex active_secondary_requests_mutex{}; - accept_requests( - *acceptor, logging, - [&](boost::asio::local::stream_protocol::socket secondary_socket) { - const size_t request_id = next_request_id.fetch_add(1); - - // We have to make sure to keep moving these sockets into the - // threads that will handle them - std::lock_guard lock(active_secondary_requests_mutex); - active_secondary_requests[request_id] = Thread( - [&, request_id](boost::asio::local::stream_protocol::socket - secondary_socket) { - process_event(secondary_socket, false); - - // When we have processed this request, we'll join the - // thread again with the thread that's handling - // `secondary_context`. - boost::asio::post(secondary_context, [&, request_id]() { - std::lock_guard lock( - active_secondary_requests_mutex); - - // The join is implicit because we're using - // std::jthread/Win32Thread - active_secondary_requests.erase(request_id); - }); - }, - std::move(secondary_socket)); - }); - - Thread secondary_requests_handler([&]() { secondary_context.run(); }); - - while (true) { - try { + this->receive_multi( + logging, + [&](boost::asio::local::stream_protocol::socket& socket) { process_event(socket, true); - } catch (const boost::system::system_error&) { - // This happens when the sockets got closed because the plugin - // is being shut down - break; - } - } - - // After the main socket gets terminated (during shutdown) we'll make - // sure all outstanding jobs have been processed and then drop all work - // from the IO context - std::lock_guard lock(active_secondary_requests_mutex); - secondary_context.stop(); - acceptor.reset(); - } - - private: - /** - * Used in `receive_events()` to asynchronously listen for secondary socket - * connections. After `callback()` returns this function will continue to be - * called until the IO context gets stopped. - * - * @param acceptor The acceptor we will be listening on. - * @param logging A pair containing a logger instance and whether or not - * this is for sending `dispatch()` events or host callbacks. Optional - * since it doesn't have to be done on both sides. - * @param callback A function that handles the new socket connection. - * - * @tparam F A function in the form - * `void(boost::asio::local::stream_protocol::socket)` to handle a new - * incoming connection. - */ - template - void accept_requests( - boost::asio::local::stream_protocol::acceptor& acceptor, - std::optional> logging, - F callback) { - acceptor.async_accept( - [&, logging, callback]( - const boost::system::error_code& error, - boost::asio::local::stream_protocol::socket secondary_socket) { - if (error.failed()) { - // On the Wine side it's expected that the main socket - // connection will be dropped during shutdown, so we can - // silently ignore any related socket errors on the Wine - // side - if (logging) { - auto [logger, is_dispatch] = *logging; - logger.log("Failure while accepting connections: " + - error.message()); - } - - return; - } - - callback(std::move(secondary_socket)); - - accept_requests(acceptor, logging, callback); + }, + [&](boost::asio::local::stream_protocol::socket& socket) { + process_event(socket, false); }); } - - /** - * The main IO context. New sockets created during `send_event()` will be - * bound to this context. In `receive_events()` we'll create a new IO - * context since we want to do all listening there on a dedicated thread. - */ - boost::asio::io_context& io_context; - - boost::asio::local::stream_protocol::endpoint endpoint; - boost::asio::local::stream_protocol::socket socket; - - /** - * This acceptor will be used once synchronously on the listening side - * during `Sockets::connect()`. When `EventHandler::receive_events()` is - * then called, we'll recreate the acceptor to asynchronously listen for new - * incoming socket connections on `endpoint` using. This is important, - * because on the case of `vst_host_callback` the acceptor is first accepts - * an initial socket on the plugin side (like all sockets), but all - * additional incoming connections of course have to be listened for on the - * plugin side. - */ - std::optional acceptor; - - /** - * A mutex that locks the main `socket`. If this is locked, then any new - * events will be sent over a new socket instead. - */ - std::mutex write_mutex; }; /**