Revert "Allow disabling ad-hoc socket spawning"

It turns out we can't safely disable this, because in some situations we
still have these mutually recursive function calls. We could optimize
this a bit to have those calls be handled by the general sockets, but
this is much more manageable.

This reverts commit 415c1b5683.
This commit is contained in:
Robbert van der Helm
2020-12-29 00:25:44 +01:00
parent 4226ab6e43
commit 33806139e9
3 changed files with 66 additions and 101 deletions
+58 -86
View File
@@ -452,13 +452,8 @@ class SocketHandler {
*
* @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
* @tparam ad_hoc_sockets Whether new sockets should be created on demand to be
* able to handle multiple function calls at the same time. If this is set to
* false, then simultaneous `send_message()` calls will have to wait for the
* earlier call to finish. This also means that the listening side does not
* have to spawn a thread to constantly listen for new connections.
*/
template <typename Thread, bool ad_hoc_sockets>
template <typename Thread>
class AdHocSocketHandler {
protected:
/**
@@ -542,15 +537,11 @@ class AdHocSocketHandler {
*/
template <typename T, typename F>
T send(F callback) {
// When the ad-hoc socket spawning bheaviour is disabled we always want
// to acquire the lock, waiting if necessary.
// XXX: Maybe at some point we should benchmark how often this
// ad hoc socket spawning mechanism gets used. If some hosts
// for instance consistently and repeatedly trigger this then
// we might be able to do some optimizations there.
std::unique_lock lock =
ad_hoc_sockets ? std::unique_lock(write_mutex, std::try_to_lock)
: std::unique_lock(write_mutex);
std::unique_lock lock(write_mutex, std::try_to_lock);
if (lock.owns_lock()) {
// This was used to always block when sending the first message,
// because the other side may not be listening for additional
@@ -618,90 +609,71 @@ class AdHocSocketHandler {
void receive_multi(std::optional<std::reference_wrapper<Logger>> logger,
F primary_callback,
G secondary_callback) {
if constexpr (ad_hoc_sockets) {
// As described above we'll handle incoming requests for `socket` on
// this thread. We'll also listen for incoming connections on
// `endpoint` on another thread. For any incoming connection we'll
// spawn a new thread to handle the request. When `socket` closes
// and this loop breaks, the listener and any still active threads
// will be cleaned up before this function exits.
boost::asio::io_context secondary_context{};
// As described above we'll handle incoming requests for `socket` on
// this thread. We'll also listen for incoming connections on `endpoint`
// on another thread. For any incoming connection we'll spawn a new
// thread to handle the request. When `socket` closes and this loop
// breaks, the listener and any still active threads will be cleaned up
// before this function exits.
boost::asio::io_context secondary_context{};
// The previous acceptor has already been shut down by
// `AdHocSocketHandler::connect()`
acceptor.emplace(secondary_context, endpoint);
// The previous acceptor has already been shut down by
// `AdHocSocketHandler::connect()`
acceptor.emplace(secondary_context, endpoint);
// This works the exact same was as `active_plugins` and
// `next_plugin_id` in `GroupBridge`
std::map<size_t, Thread> active_secondary_requests{};
std::atomic_size_t next_request_id{};
std::mutex active_secondary_requests_mutex{};
accept_requests(
*acceptor, logger,
[&](boost::asio::local::stream_protocol::socket
secondary_socket) {
const size_t request_id = next_request_id.fetch_add(1);
// This works the exact same was as `active_plugins` and
// `next_plugin_id` in `GroupBridge`
std::map<size_t, Thread> active_secondary_requests{};
std::atomic_size_t next_request_id{};
std::mutex active_secondary_requests_mutex{};
accept_requests(
*acceptor, logger,
[&](boost::asio::local::stream_protocol::socket secondary_socket) {
const size_t request_id = next_request_id.fetch_add(1);
// We have to make sure to keep moving these sockets into
// the threads that will handle them
std::lock_guard lock(active_secondary_requests_mutex);
active_secondary_requests[request_id] = Thread(
[&,
request_id](boost::asio::local::stream_protocol::socket
secondary_socket) {
secondary_callback(secondary_socket);
// We have to make sure to keep moving these sockets into the
// threads that will handle them
std::lock_guard lock(active_secondary_requests_mutex);
active_secondary_requests[request_id] = Thread(
[&, request_id](boost::asio::local::stream_protocol::socket
secondary_socket) {
secondary_callback(secondary_socket);
// When we have processed this request, we'll join
// the thread again with the thread that's handling
// `secondary_context`
boost::asio::post(
secondary_context, [&, request_id]() {
std::lock_guard lock(
active_secondary_requests_mutex);
// When we have processed this request, we'll join the
// thread again with the thread that's handling
// `secondary_context`
boost::asio::post(secondary_context, [&, request_id]() {
std::lock_guard lock(
active_secondary_requests_mutex);
// The join is implicit because we're using
// `std::jthread`/`Win32Thread`
active_secondary_requests.erase(request_id);
});
},
std::move(secondary_socket));
});
// The join is implicit because we're using
// `std::jthread`/`Win32Thread`
active_secondary_requests.erase(request_id);
});
},
std::move(secondary_socket));
});
Thread secondary_requests_handler(
[&]() { secondary_context.run(); });
Thread secondary_requests_handler([&]() { secondary_context.run(); });
// Now we'll handle reads on the primary socket in a loop until the
// socket shuts down
while (true) {
try {
primary_callback(socket);
} catch (const boost::system::system_error&) {
// This happens when the sockets got closed because the
// plugin is being shut down
break;
}
}
// After the primary socket gets terminated (during shutdown) we'll
// make sure all outstanding jobs have been processed and then drop
// all work from the IO context
std::lock_guard lock(active_secondary_requests_mutex);
secondary_context.stop();
acceptor.reset();
} else {
// If ad-hoc sockets are disabled, then we only care about the
// primary socket loop (e.g. when handing calls to
// `IAudioProcessor`, where we want the exact same behaviour as when
// handling normal VST3 messages but we don't want to spawn
// additional threads for perofrmance considerations)
while (true) {
try {
primary_callback(socket);
} catch (const boost::system::system_error&) {
break;
}
// Now we'll handle reads on the primary socket in a loop until the
// socket shuts down
while (true) {
try {
primary_callback(socket);
} catch (const boost::system::system_error&) {
// This happens when the sockets got closed because the plugin
// is being shut down
break;
}
}
// After the primary socket gets terminated (during shutdown) we'll make
// sure all outstanding jobs have been processed and then drop all work
// from the IO context
std::lock_guard lock(active_secondary_requests_mutex);
secondary_context.stop();
acceptor.reset();
}
/**
+2 -2
View File
@@ -106,7 +106,7 @@ class DefaultDataConverter {
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
*/
template <typename Thread>
class EventHandler : public AdHocSocketHandler<Thread, true> {
class EventHandler : public AdHocSocketHandler<Thread> {
public:
/**
* Sets up a single main socket for this type of events. The sockets won't
@@ -125,7 +125,7 @@ class EventHandler : public AdHocSocketHandler<Thread, true> {
EventHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint,
bool listen)
: AdHocSocketHandler<Thread, true>(io_context, endpoint, listen) {}
: AdHocSocketHandler<Thread>(io_context, endpoint, listen) {}
/**
* Serialize and send an event over a socket. This is used for both the host
+6 -13
View File
@@ -39,14 +39,9 @@
* @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
* @tparam Request Either `ControlRequest` or `CallbackRequest`.
* @tparam ad_hoc_sockets Whether new sockets should be created on demand to be
* able to handle multiple function calls at the same time. If this is set to
* false, then simultaneous `send_message()` calls will have to wait for the
* earlier call to finish. This also means that the listening side does not
* have to spawn a thread to constantly listen for new connections.
*/
template <typename Thread, typename Request, bool ad_hoc_sockets>
class Vst3MessageHandler : public AdHocSocketHandler<Thread, ad_hoc_sockets> {
template <typename Thread, typename Request>
class Vst3MessageHandler : public AdHocSocketHandler<Thread> {
public:
/**
* Sets up a single main socket for this type of events. The sockets won't
@@ -65,9 +60,7 @@ class Vst3MessageHandler : public AdHocSocketHandler<Thread, ad_hoc_sockets> {
Vst3MessageHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint,
bool listen)
: AdHocSocketHandler<Thread, ad_hoc_sockets>(io_context,
endpoint,
listen) {}
: AdHocSocketHandler<Thread>(io_context, endpoint, listen) {}
/**
* Serialize and send an event over a socket and return the appropriate
@@ -447,14 +440,14 @@ class Vst3Sockets : public Sockets {
* This will be listened on by the Wine plugin host when it calls
* `receive_multi()`.
*/
Vst3MessageHandler<Thread, ControlRequest, true> host_vst_control;
Vst3MessageHandler<Thread, ControlRequest> host_vst_control;
/**
* For sending callbacks from the plugin back to the host. After we have a
* better idea of what our communication model looks like we'll probably
* want to provide an abstraction similar to `EventHandler`.
*/
Vst3MessageHandler<Thread, CallbackRequest, true> vst_host_callback;
Vst3MessageHandler<Thread, CallbackRequest> vst_host_callback;
private:
boost::asio::io_context& io_context;
@@ -470,7 +463,7 @@ class Vst3Sockets : public Sockets {
* would have one dedicated thread for handling function calls to these
* interfaces, and then another dedicated thread just idling around.
*/
std::map<size_t, Vst3MessageHandler<Thread, AudioProcessorRequest, false>>
std::map<size_t, Vst3MessageHandler<Thread, AudioProcessorRequest>>
audio_processor_sockets;
/**
* Binary buffers used for serializing objects and receiving messages into