Allow disabling ad-hoc socket spawning

We'll need this for handling `IAudioProcessor` method calls in VST3. We
basically want a `Vst3MessageHandler` per `IAudioProcessor` instance,
but without the additional socket spawning or extra thread.
This commit is contained in:
Robbert van der Helm
2020-12-21 15:45:47 +01:00
parent ecd0de9d7d
commit 415c1b5683
3 changed files with 100 additions and 65 deletions
+86 -58
View File
@@ -447,8 +447,13 @@ class SocketHandler {
* *
* @tparam Thread The thread implementation to use. On the Linux side this * @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`. * should be `std::jthread` and on the Wine side this should be `Win32Thread`.
* @tparam ad_hoc_sockets Whether new sockets should be created on demand to be
* able to handle multiple function calls at the same time. If this is set to
* false, then simultaneous `send_message()` calls will have to wait for the
* earlier call to finish. This also means that the listening side does not
* have to spawn a thread to constantly listen for new connections.
*/ */
template <typename Thread> template <typename Thread, bool ad_hoc_sockets>
class AdHocSocketHandler { class AdHocSocketHandler {
protected: protected:
/** /**
@@ -530,11 +535,15 @@ class AdHocSocketHandler {
*/ */
template <typename T, typename F> template <typename T, typename F>
T send(F callback) { T send(F callback) {
// When the ad-hoc socket spawning bheaviour is disabled we always want
// to acquire the lock, waiting if necessary.
// XXX: Maybe at some point we should benchmark how often this // XXX: Maybe at some point we should benchmark how often this
// ad hoc socket spawning mechanism gets used. If some hosts // ad hoc socket spawning mechanism gets used. If some hosts
// for instance consistently and repeatedly trigger this then // for instance consistently and repeatedly trigger this then
// we might be able to do some optimizations there. // we might be able to do some optimizations there.
std::unique_lock lock(write_mutex, std::try_to_lock); std::unique_lock lock =
ad_hoc_sockets ? std::unique_lock(write_mutex, std::try_to_lock)
: std::unique_lock(write_mutex);
if (lock.owns_lock()) { if (lock.owns_lock()) {
// This was used to always block when sending the first message, // This was used to always block when sending the first message,
// because the other side may not be listening for additional // because the other side may not be listening for additional
@@ -602,71 +611,90 @@ class AdHocSocketHandler {
void receive_multi(std::optional<std::reference_wrapper<Logger>> logger, void receive_multi(std::optional<std::reference_wrapper<Logger>> logger,
F primary_callback, F primary_callback,
G secondary_callback) { G secondary_callback) {
// As described above we'll handle incoming requests for `socket` on if constexpr (ad_hoc_sockets) {
// this thread. We'll also listen for incoming connections on `endpoint` // As described above we'll handle incoming requests for `socket` on
// on another thread. For any incoming connection we'll spawn a new // this thread. We'll also listen for incoming connections on
// thread to handle the request. When `socket` closes and this loop // `endpoint` on another thread. For any incoming connection we'll
// breaks, the listener and any still active threads will be cleaned up // spawn a new thread to handle the request. When `socket` closes
// before this function exits. // and this loop breaks, the listener and any still active threads
boost::asio::io_context secondary_context{}; // will be cleaned up before this function exits.
boost::asio::io_context secondary_context{};
// The previous acceptor has already been shut down by // The previous acceptor has already been shut down by
// `AdHocSocketHandler::connect()` // `AdHocSocketHandler::connect()`
acceptor.emplace(secondary_context, endpoint); acceptor.emplace(secondary_context, endpoint);
// This works the exact same was as `active_plugins` and // This works the exact same was as `active_plugins` and
// `next_plugin_id` in `GroupBridge` // `next_plugin_id` in `GroupBridge`
std::map<size_t, Thread> active_secondary_requests{}; std::map<size_t, Thread> active_secondary_requests{};
std::atomic_size_t next_request_id{}; std::atomic_size_t next_request_id{};
std::mutex active_secondary_requests_mutex{}; std::mutex active_secondary_requests_mutex{};
accept_requests( accept_requests(
*acceptor, logger, *acceptor, logger,
[&](boost::asio::local::stream_protocol::socket secondary_socket) { [&](boost::asio::local::stream_protocol::socket
const size_t request_id = next_request_id.fetch_add(1); secondary_socket) {
const size_t request_id = next_request_id.fetch_add(1);
// We have to make sure to keep moving these sockets into the // We have to make sure to keep moving these sockets into
// threads that will handle them // the threads that will handle them
std::lock_guard lock(active_secondary_requests_mutex); std::lock_guard lock(active_secondary_requests_mutex);
active_secondary_requests[request_id] = Thread( active_secondary_requests[request_id] = Thread(
[&, request_id](boost::asio::local::stream_protocol::socket [&,
secondary_socket) { request_id](boost::asio::local::stream_protocol::socket
secondary_callback(secondary_socket); secondary_socket) {
secondary_callback(secondary_socket);
// When we have processed this request, we'll join the // When we have processed this request, we'll join
// thread again with the thread that's handling // the thread again with the thread that's handling
// `secondary_context` // `secondary_context`
boost::asio::post(secondary_context, [&, request_id]() { boost::asio::post(
std::lock_guard lock( secondary_context, [&, request_id]() {
active_secondary_requests_mutex); std::lock_guard lock(
active_secondary_requests_mutex);
// The join is implicit because we're using // The join is implicit because we're using
// `std::jthread`/`Win32Thread` // `std::jthread`/`Win32Thread`
active_secondary_requests.erase(request_id); active_secondary_requests.erase(request_id);
}); });
}, },
std::move(secondary_socket)); std::move(secondary_socket));
}); });
Thread secondary_requests_handler([&]() { secondary_context.run(); }); Thread secondary_requests_handler(
[&]() { secondary_context.run(); });
// Now we'll handle reads on the primary socket in a loop until the // Now we'll handle reads on the primary socket in a loop until the
// socket shuts down // socket shuts down
while (true) { while (true) {
try { try {
primary_callback(socket); primary_callback(socket);
} catch (const boost::system::system_error&) { } catch (const boost::system::system_error&) {
// This happens when the sockets got closed because the plugin // This happens when the sockets got closed because the
// is being shut down // plugin is being shut down
break; break;
}
}
// After the primary socket gets terminated (during shutdown) we'll
// make sure all outstanding jobs have been processed and then drop
// all work from the IO context
std::lock_guard lock(active_secondary_requests_mutex);
secondary_context.stop();
acceptor.reset();
} else {
// If ad-hoc sockets are disabled, then we only care about the
// primary socket loop (e.g. when handing calls to
// `IAudioProcessor`, where we want the exact same behaviour as when
// handling normal VST3 messages but we don't want to spawn
// additional threads for perofrmance considerations)
while (true) {
try {
primary_callback(socket);
} catch (const boost::system::system_error&) {
break;
}
} }
} }
// After the primary socket gets terminated (during shutdown) we'll make
// sure all outstanding jobs have been processed and then drop all work
// from the IO context
std::lock_guard lock(active_secondary_requests_mutex);
secondary_context.stop();
acceptor.reset();
} }
/** /**
+2 -2
View File
@@ -106,7 +106,7 @@ class DefaultDataConverter {
* should be `std::jthread` and on the Wine side this should be `Win32Thread`. * should be `std::jthread` and on the Wine side this should be `Win32Thread`.
*/ */
template <typename Thread> template <typename Thread>
class EventHandler : public AdHocSocketHandler<Thread> { class EventHandler : public AdHocSocketHandler<Thread, true> {
public: public:
/** /**
* Sets up a single main socket for this type of events. The sockets won't * Sets up a single main socket for this type of events. The sockets won't
@@ -125,7 +125,7 @@ class EventHandler : public AdHocSocketHandler<Thread> {
EventHandler(boost::asio::io_context& io_context, EventHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint, boost::asio::local::stream_protocol::endpoint endpoint,
bool listen) bool listen)
: AdHocSocketHandler<Thread>(io_context, endpoint, listen) {} : AdHocSocketHandler<Thread, true>(io_context, endpoint, listen) {}
/** /**
* Serialize and send an event over a socket. This is used for both the host * Serialize and send an event over a socket. This is used for both the host
+12 -5
View File
@@ -38,9 +38,14 @@
* @tparam Thread The thread implementation to use. On the Linux side this * @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`. * should be `std::jthread` and on the Wine side this should be `Win32Thread`.
* @tparam Request Either `ControlRequest` or `CallbackRequest`. * @tparam Request Either `ControlRequest` or `CallbackRequest`.
* @tparam ad_hoc_sockets Whether new sockets should be created on demand to be
* able to handle multiple function calls at the same time. If this is set to
* false, then simultaneous `send_message()` calls will have to wait for the
* earlier call to finish. This also means that the listening side does not
* have to spawn a thread to constantly listen for new connections.
*/ */
template <typename Thread, typename Request> template <typename Thread, typename Request, bool ad_hoc_sockets>
class Vst3MessageHandler : public AdHocSocketHandler<Thread> { class Vst3MessageHandler : public AdHocSocketHandler<Thread, ad_hoc_sockets> {
public: public:
/** /**
* Sets up a single main socket for this type of events. The sockets won't * Sets up a single main socket for this type of events. The sockets won't
@@ -59,7 +64,9 @@ class Vst3MessageHandler : public AdHocSocketHandler<Thread> {
Vst3MessageHandler(boost::asio::io_context& io_context, Vst3MessageHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint, boost::asio::local::stream_protocol::endpoint endpoint,
bool listen) bool listen)
: AdHocSocketHandler<Thread>(io_context, endpoint, listen) {} : AdHocSocketHandler<Thread, ad_hoc_sockets>(io_context,
endpoint,
listen) {}
/** /**
* Serialize and send an event over a socket and return the appropriate * Serialize and send an event over a socket and return the appropriate
@@ -272,12 +279,12 @@ class Vst3Sockets : public Sockets {
* This will be listened on by the Wine plugin host when it calls * This will be listened on by the Wine plugin host when it calls
* `receive_multi()`. * `receive_multi()`.
*/ */
Vst3MessageHandler<Thread, ControlRequest> host_vst_control; Vst3MessageHandler<Thread, ControlRequest, true> host_vst_control;
/** /**
* For sending callbacks from the plugin back to the host. After we have a * For sending callbacks from the plugin back to the host. After we have a
* better idea of what our communication model looks like we'll probably * better idea of what our communication model looks like we'll probably
* want to provide an abstraction similar to `EventHandler`. * want to provide an abstraction similar to `EventHandler`.
*/ */
Vst3MessageHandler<Thread, CallbackRequest> vst_host_callback; Vst3MessageHandler<Thread, CallbackRequest, true> vst_host_callback;
}; };