Create secondary sockets for sending nested events

This commit is contained in:
Robbert van der Helm
2020-10-26 12:13:27 +01:00
parent d82f8463d9
commit 2f6883977f
2 changed files with 26 additions and 10 deletions
+1 -1
View File
@@ -85,7 +85,7 @@ EventHandler::EventHandler(
boost::asio::io_context& io_context, boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint, boost::asio::local::stream_protocol::endpoint endpoint,
bool listen) bool listen)
: endpoint(endpoint), socket(io_context) { : io_context(io_context), endpoint(endpoint), socket(io_context) {
if (listen) { if (listen) {
fs::create_directories(fs::path(endpoint.path()).parent_path()); fs::create_directories(fs::path(endpoint.path()).parent_path());
acceptor.emplace(io_context, endpoint); acceptor.emplace(io_context, endpoint);
+25 -9
View File
@@ -259,8 +259,6 @@ class EventHandler {
intptr_t value, intptr_t value,
void* data, void* data,
float option) { float option) {
// TODO: Create a new socket if the mutex is locked
// Encode the right payload types for this event. Check the // Encode the right payload types for this event. Check the
// documentation for `EventPayload` for more information. These types // documentation for `EventPayload` for more information. These types
// are converted to C-style data structures in `passthrough_event()` so // are converted to C-style data structures in `passthrough_event()` so
@@ -283,15 +281,26 @@ class EventHandler {
.payload = payload, .payload = payload,
.value_payload = value_payload}; .value_payload = value_payload};
// Prevent two threads from writing over the socket at the same time and // A socket only handles a single request at a time as to prevent
// messages getting out of order. This is needed because we can't // messages from arriving out of order. For throughput reasons we prefer
// prevent the plugin or the host from calling `dispatch()` or // to do most communication over a single main socket (`socket`), and
// `audioMaster()` from multiple threads. // we'll lock `write_mutex` while doing so. In the event that the mutex
// is already locked and thus the main socket is currently in use by
// another thread, then we'll spawn a new socket to handle the request.
EventResult response; EventResult response;
{ {
std::lock_guard lock(write_mutex); std::unique_lock lock(write_mutex, std::try_to_lock);
write_object(socket, event); if (lock.owns_lock()) {
response = read_object<EventResult>(socket); write_object(socket, event);
response = read_object<EventResult>(socket);
} else {
boost::asio::local::stream_protocol::socket secondary_socket(
io_context);
secondary_socket.connect(endpoint);
write_object(secondary_socket, event);
response = read_object<EventResult>(secondary_socket);
}
} }
if (logging) { if (logging) {
@@ -363,6 +372,13 @@ class EventHandler {
} }
private: private:
/**
* The main IO context. New sockets created during `send()` will be bound to
* this context. In `receive()` we'll create a new IO context since we want
* to do all listening there on a dedicated thread.
*/
boost::asio::io_context& io_context;
boost::asio::local::stream_protocol::endpoint endpoint; boost::asio::local::stream_protocol::endpoint endpoint;
boost::asio::local::stream_protocol::socket socket; boost::asio::local::stream_protocol::socket socket;