mirror of
https://github.com/robbert-vdh/yabridge.git
synced 2026-05-07 03:50:11 +02:00
Reimplement EventHandler on top of AdHocSocketHandler
This commit is contained in:
@@ -488,25 +488,26 @@ class AdHocSocketHandler {
|
|||||||
* socket. This is either the primary `socket`, or a new ad hock socket if
|
* socket. This is either the primary `socket`, or a new ad hock socket if
|
||||||
* this function is currently being called from another thread.
|
* this function is currently being called from another thread.
|
||||||
*
|
*
|
||||||
|
* @tparam T The return value of F.
|
||||||
* @tparam F A function in the form of
|
* @tparam F A function in the form of
|
||||||
* `void(boost::asio::local::stream_protocol::socket&)`.
|
* `T(boost::asio::local::stream_protocol::socket&)`.
|
||||||
*/
|
*/
|
||||||
template <typename F>
|
template <typename T, typename F>
|
||||||
void send(F callback) {
|
T send(F callback) {
|
||||||
// XXX: Maybe at some point we should benchmark how often this
|
// XXX: Maybe at some point we should benchmark how often this
|
||||||
// ad hoc socket spawning mechanism gets used. If some hosts
|
// ad hoc socket spawning mechanism gets used. If some hosts
|
||||||
// for instance consistently and repeatedly trigger this then
|
// for instance consistently and repeatedly trigger this then
|
||||||
// we might be able to do some optimizations there.
|
// we might be able to do some optimizations there.
|
||||||
std::unique_lock lock(write_mutex, std::try_to_lock);
|
std::unique_lock lock(write_mutex, std::try_to_lock);
|
||||||
if (lock.owns_lock()) {
|
if (lock.owns_lock()) {
|
||||||
callback(socket);
|
return callback(socket);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
boost::asio::local::stream_protocol::socket secondary_socket(
|
boost::asio::local::stream_protocol::socket secondary_socket(
|
||||||
io_context);
|
io_context);
|
||||||
secondary_socket.connect(endpoint);
|
secondary_socket.connect(endpoint);
|
||||||
|
|
||||||
callback(secondary_socket);
|
return callback(secondary_socket);
|
||||||
} catch (const boost::system::system_error&) {
|
} catch (const boost::system::system_error&) {
|
||||||
// So, what do we do when noone is listening on the endpoint
|
// So, what do we do when noone is listening on the endpoint
|
||||||
// yet? This can happen with plugin groups when the Wine host
|
// yet? This can happen with plugin groups when the Wine host
|
||||||
@@ -517,7 +518,7 @@ class AdHocSocketHandler {
|
|||||||
// long living primary socket please let me know.
|
// long living primary socket please let me know.
|
||||||
std::lock_guard lock(write_mutex);
|
std::lock_guard lock(write_mutex);
|
||||||
|
|
||||||
callback(socket);
|
return callback(socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+18
-224
@@ -77,10 +77,8 @@ class DefaultDataConverter {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* So, this is a bit of a mess. The TL;DR is that we want to use a single long
|
* An instance of `AdHocSocketHandler` that can handle VST2 `dispatcher()` and
|
||||||
* living socket connection for `dispatch()` and another one for `audioMaster()`
|
* `audioMaster()` events.
|
||||||
* for performance reasons, but when the socket is already being written to we
|
|
||||||
* create new connections on demand.
|
|
||||||
*
|
*
|
||||||
* For most of our sockets we can just send out our messages on the writing
|
* For most of our sockets we can just send out our messages on the writing
|
||||||
* side, and do a simple blocking loop on the reading side. The `dispatch()` and
|
* side, and do a simple blocking loop on the reading side. The `dispatch()` and
|
||||||
@@ -103,14 +101,11 @@ class DefaultDataConverter {
|
|||||||
* sets up asynchronous listeners for the socket endpoint, and then block and
|
* sets up asynchronous listeners for the socket endpoint, and then block and
|
||||||
* handle events until the main socket is closed.
|
* handle events until the main socket is closed.
|
||||||
*
|
*
|
||||||
* TODO: Factor out the on-demand socket spawning and handling logic so we can
|
|
||||||
* reuse most of this for the VST3 implementation
|
|
||||||
*
|
|
||||||
* @tparam Thread The thread implementation to use. On the Linux side this
|
* @tparam Thread The thread implementation to use. On the Linux side this
|
||||||
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
|
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
|
||||||
*/
|
*/
|
||||||
template <typename Thread>
|
template <typename Thread>
|
||||||
class EventHandler {
|
class EventHandler : public AdHocSocketHandler<Thread> {
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Sets up a single main socket for this type of events. The sockets won't
|
* Sets up a single main socket for this type of events. The sockets won't
|
||||||
@@ -129,45 +124,7 @@ class EventHandler {
|
|||||||
EventHandler(boost::asio::io_context& io_context,
|
EventHandler(boost::asio::io_context& io_context,
|
||||||
boost::asio::local::stream_protocol::endpoint endpoint,
|
boost::asio::local::stream_protocol::endpoint endpoint,
|
||||||
bool listen)
|
bool listen)
|
||||||
: io_context(io_context), endpoint(endpoint), socket(io_context) {
|
: AdHocSocketHandler<Thread>(io_context, endpoint, listen) {}
|
||||||
if (listen) {
|
|
||||||
boost::filesystem::create_directories(
|
|
||||||
boost::filesystem::path(endpoint.path()).parent_path());
|
|
||||||
acceptor.emplace(io_context, endpoint);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Depending on the value of the `listen` argument passed to the
|
|
||||||
* constructor, either accept connections made to the sockets on the Linux
|
|
||||||
* side or connect to the sockets on the Wine side
|
|
||||||
*/
|
|
||||||
void connect() {
|
|
||||||
if (acceptor) {
|
|
||||||
acceptor->accept(socket);
|
|
||||||
|
|
||||||
// As mentioned in `acceptor's` docstring, this acceptor will be
|
|
||||||
// recreated in `receive_events()` on another context, and
|
|
||||||
// potentially on the other side of the connection in the case of
|
|
||||||
// `vst_host_callback`
|
|
||||||
acceptor.reset();
|
|
||||||
boost::filesystem::remove(endpoint.path());
|
|
||||||
} else {
|
|
||||||
socket.connect(endpoint);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the socket. Both sides that are actively listening will be thrown a
|
|
||||||
* `boost::system_error` when this happens.
|
|
||||||
*/
|
|
||||||
void close() {
|
|
||||||
// The shutdown can fail when the socket is already closed
|
|
||||||
boost::system::error_code err;
|
|
||||||
socket.shutdown(
|
|
||||||
boost::asio::local::stream_protocol::socket::shutdown_both, err);
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialize and send an event over a socket. This is used for both the host
|
* Serialize and send an event over a socket. This is used for both the host
|
||||||
@@ -223,45 +180,14 @@ class EventHandler {
|
|||||||
.value_payload = value_payload};
|
.value_payload = value_payload};
|
||||||
|
|
||||||
// A socket only handles a single request at a time as to prevent
|
// A socket only handles a single request at a time as to prevent
|
||||||
// messages from arriving out of order. For throughput reasons we prefer
|
// messages from arriving out of order. `AdHocSocketHandler::send()`
|
||||||
// to do most communication over a single main socket (`socket`), and
|
// will either use a long-living primary socket, or if that's currently
|
||||||
// we'll lock `write_mutex` while doing so. In the event that the mutex
|
// in use it will spawn a new socket for us.
|
||||||
// is already locked and thus the main socket is currently in use by
|
EventResult response = this->template send<EventResult>(
|
||||||
// another thread, then we'll spawn a new socket to handle the request.
|
[&](boost::asio::local::stream_protocol::socket& socket) {
|
||||||
EventResult response;
|
|
||||||
{
|
|
||||||
// XXX: Maybe at some point we should benchmark how often this
|
|
||||||
// ad hoc socket spawning mechanism gets used. If some hosts
|
|
||||||
// for instance consistently and repeatedly trigger this then
|
|
||||||
// we might be able to do some optimizations there.
|
|
||||||
std::unique_lock lock(write_mutex, std::try_to_lock);
|
|
||||||
if (lock.owns_lock()) {
|
|
||||||
write_object(socket, event);
|
write_object(socket, event);
|
||||||
response = read_object<EventResult>(socket);
|
return read_object<EventResult>(socket);
|
||||||
} else {
|
});
|
||||||
try {
|
|
||||||
boost::asio::local::stream_protocol::socket
|
|
||||||
secondary_socket(io_context);
|
|
||||||
secondary_socket.connect(endpoint);
|
|
||||||
|
|
||||||
write_object(secondary_socket, event);
|
|
||||||
response = read_object<EventResult>(secondary_socket);
|
|
||||||
} catch (const boost::system::system_error&) {
|
|
||||||
// So, what do we do when noone is listening on the endpoint
|
|
||||||
// yet? This can happen with plugin groups when the Wine
|
|
||||||
// host process does an `audioMaster()` call before the
|
|
||||||
// plugin is listening. If that happens we'll fall back to a
|
|
||||||
// synchronous request. This is not very pretty, so if
|
|
||||||
// anyone can think of a better way to structure all of this
|
|
||||||
// while still mainting a long living primary socket please
|
|
||||||
// let me know.
|
|
||||||
std::lock_guard lock(write_mutex);
|
|
||||||
|
|
||||||
write_object(socket, event);
|
|
||||||
response = read_object<EventResult>(socket);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (logging) {
|
if (logging) {
|
||||||
auto [logger, is_dispatch] = *logging;
|
auto [logger, is_dispatch] = *logging;
|
||||||
@@ -278,7 +204,7 @@ class EventHandler {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Spawn a new thread to listen for extra connections to `endpoint`, and
|
* Spawn a new thread to listen for extra connections to `endpoint`, and
|
||||||
* then a blocking loop that handles events from the primary `socket`.
|
* then start a blocking loop that handles events from the primary `socket`.
|
||||||
*
|
*
|
||||||
* The specified function will be used to create an `EventResult` from an
|
* The specified function will be used to create an `EventResult` from an
|
||||||
* `Event`. This is almost uses `passthrough_event()`, which converts a
|
* `Event`. This is almost uses `passthrough_event()`, which converts a
|
||||||
@@ -286,10 +212,6 @@ class EventHandler {
|
|||||||
* `audioMaster()` depending on the context, and then serializes the result
|
* `audioMaster()` depending on the context, and then serializes the result
|
||||||
* back into an `EventResultPayload`.
|
* back into an `EventResultPayload`.
|
||||||
*
|
*
|
||||||
* This function will also be used separately for receiving MIDI data, as
|
|
||||||
* some plugins will need pointers to received MIDI data to stay alive until
|
|
||||||
* the next audio buffer gets processed.
|
|
||||||
*
|
|
||||||
* @param logging A pair containing a logger instance and whether or not
|
* @param logging A pair containing a logger instance and whether or not
|
||||||
* this is for sending `dispatch()` events or host callbacks. Optional
|
* this is for sending `dispatch()` events or host callbacks. Optional
|
||||||
* since it doesn't have to be done on both sides.
|
* since it doesn't have to be done on both sides.
|
||||||
@@ -330,143 +252,15 @@ class EventHandler {
|
|||||||
write_object(socket, response);
|
write_object(socket, response);
|
||||||
};
|
};
|
||||||
|
|
||||||
// As described above we'll handle incoming requests for `socket` on
|
this->receive_multi(
|
||||||
// this thread. We'll also listen for incoming connections on `endpoint`
|
logging,
|
||||||
// on another thread. For any incoming connection we'll spawn a new
|
[&](boost::asio::local::stream_protocol::socket& socket) {
|
||||||
// thread to handle the request. When `socket` closes and this loop
|
|
||||||
// breaks, the listener and any still active threads will be cleaned up
|
|
||||||
// before this function exits.
|
|
||||||
boost::asio::io_context secondary_context{};
|
|
||||||
|
|
||||||
// The previous acceptor has already been shut down by
|
|
||||||
// `EventHandler::connect()`
|
|
||||||
acceptor.emplace(secondary_context, endpoint);
|
|
||||||
|
|
||||||
// This works the exact same was as `active_plugins` and
|
|
||||||
// `next_plugin_id` in `GroupBridge`
|
|
||||||
std::map<size_t, Thread> active_secondary_requests{};
|
|
||||||
std::atomic_size_t next_request_id{};
|
|
||||||
std::mutex active_secondary_requests_mutex{};
|
|
||||||
accept_requests(
|
|
||||||
*acceptor, logging,
|
|
||||||
[&](boost::asio::local::stream_protocol::socket secondary_socket) {
|
|
||||||
const size_t request_id = next_request_id.fetch_add(1);
|
|
||||||
|
|
||||||
// We have to make sure to keep moving these sockets into the
|
|
||||||
// threads that will handle them
|
|
||||||
std::lock_guard lock(active_secondary_requests_mutex);
|
|
||||||
active_secondary_requests[request_id] = Thread(
|
|
||||||
[&, request_id](boost::asio::local::stream_protocol::socket
|
|
||||||
secondary_socket) {
|
|
||||||
process_event(secondary_socket, false);
|
|
||||||
|
|
||||||
// When we have processed this request, we'll join the
|
|
||||||
// thread again with the thread that's handling
|
|
||||||
// `secondary_context`.
|
|
||||||
boost::asio::post(secondary_context, [&, request_id]() {
|
|
||||||
std::lock_guard lock(
|
|
||||||
active_secondary_requests_mutex);
|
|
||||||
|
|
||||||
// The join is implicit because we're using
|
|
||||||
// std::jthread/Win32Thread
|
|
||||||
active_secondary_requests.erase(request_id);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
std::move(secondary_socket));
|
|
||||||
});
|
|
||||||
|
|
||||||
Thread secondary_requests_handler([&]() { secondary_context.run(); });
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
process_event(socket, true);
|
process_event(socket, true);
|
||||||
} catch (const boost::system::system_error&) {
|
},
|
||||||
// This happens when the sockets got closed because the plugin
|
[&](boost::asio::local::stream_protocol::socket& socket) {
|
||||||
// is being shut down
|
process_event(socket, false);
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// After the main socket gets terminated (during shutdown) we'll make
|
|
||||||
// sure all outstanding jobs have been processed and then drop all work
|
|
||||||
// from the IO context
|
|
||||||
std::lock_guard lock(active_secondary_requests_mutex);
|
|
||||||
secondary_context.stop();
|
|
||||||
acceptor.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
/**
|
|
||||||
* Used in `receive_events()` to asynchronously listen for secondary socket
|
|
||||||
* connections. After `callback()` returns this function will continue to be
|
|
||||||
* called until the IO context gets stopped.
|
|
||||||
*
|
|
||||||
* @param acceptor The acceptor we will be listening on.
|
|
||||||
* @param logging A pair containing a logger instance and whether or not
|
|
||||||
* this is for sending `dispatch()` events or host callbacks. Optional
|
|
||||||
* since it doesn't have to be done on both sides.
|
|
||||||
* @param callback A function that handles the new socket connection.
|
|
||||||
*
|
|
||||||
* @tparam F A function in the form
|
|
||||||
* `void(boost::asio::local::stream_protocol::socket)` to handle a new
|
|
||||||
* incoming connection.
|
|
||||||
*/
|
|
||||||
template <typename F>
|
|
||||||
void accept_requests(
|
|
||||||
boost::asio::local::stream_protocol::acceptor& acceptor,
|
|
||||||
std::optional<std::pair<Logger&, bool>> logging,
|
|
||||||
F callback) {
|
|
||||||
acceptor.async_accept(
|
|
||||||
[&, logging, callback](
|
|
||||||
const boost::system::error_code& error,
|
|
||||||
boost::asio::local::stream_protocol::socket secondary_socket) {
|
|
||||||
if (error.failed()) {
|
|
||||||
// On the Wine side it's expected that the main socket
|
|
||||||
// connection will be dropped during shutdown, so we can
|
|
||||||
// silently ignore any related socket errors on the Wine
|
|
||||||
// side
|
|
||||||
if (logging) {
|
|
||||||
auto [logger, is_dispatch] = *logging;
|
|
||||||
logger.log("Failure while accepting connections: " +
|
|
||||||
error.message());
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
callback(std::move(secondary_socket));
|
|
||||||
|
|
||||||
accept_requests(acceptor, logging, callback);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The main IO context. New sockets created during `send_event()` will be
|
|
||||||
* bound to this context. In `receive_events()` we'll create a new IO
|
|
||||||
* context since we want to do all listening there on a dedicated thread.
|
|
||||||
*/
|
|
||||||
boost::asio::io_context& io_context;
|
|
||||||
|
|
||||||
boost::asio::local::stream_protocol::endpoint endpoint;
|
|
||||||
boost::asio::local::stream_protocol::socket socket;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This acceptor will be used once synchronously on the listening side
|
|
||||||
* during `Sockets::connect()`. When `EventHandler::receive_events()` is
|
|
||||||
* then called, we'll recreate the acceptor to asynchronously listen for new
|
|
||||||
* incoming socket connections on `endpoint` using. This is important,
|
|
||||||
* because on the case of `vst_host_callback` the acceptor is first accepts
|
|
||||||
* an initial socket on the plugin side (like all sockets), but all
|
|
||||||
* additional incoming connections of course have to be listened for on the
|
|
||||||
* plugin side.
|
|
||||||
*/
|
|
||||||
std::optional<boost::asio::local::stream_protocol::acceptor> acceptor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A mutex that locks the main `socket`. If this is locked, then any new
|
|
||||||
* events will be sent over a new socket instead.
|
|
||||||
*/
|
|
||||||
std::mutex write_mutex;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user