Add a generic ad hoc socket listener

This is a generalized version of EventHandler.
This commit is contained in:
Robbert van der Helm
2020-11-30 18:03:25 +01:00
parent 555b442f75
commit 5607a643e4
+312 -1
View File
@@ -28,6 +28,8 @@
#include <boost/asio/write.hpp>
#include <boost/filesystem.hpp>
#include "../logging.h"
template <typename B>
using OutputAdapter = bitsery::OutputBufferAdapter<B>;
@@ -277,7 +279,8 @@ class SocketHandler {
*
* @warning This operation is not atomic, and calling this function with the
* same socket from multiple threads at once will cause issues with the
* packets arriving out of order.
* packets arriving out of order. The caller is responsible for preventing
* this.
*
* @see write_object
* @see SocketHandler::receive_single
@@ -312,6 +315,14 @@ class SocketHandler {
* @throw boost::system::system_error If the socket is closed or gets closed
* while reading.
*
* @note This function can safely be called within the lambda of
* `SocketHandler::receive_multi()`.
*
* @warning This operation is not atomic, and calling this function with the
* same socket from multiple threads at once will cause issues with the
* packets arriving out of order. The caller is responsible for preventing
* this.
*
* @relates SocketHandler::send
*
* @see read_object
@@ -377,3 +388,303 @@ class SocketHandler {
*/
std::optional<boost::asio::local::stream_protocol::acceptor> acceptor;
};
/**
* There are situations where we can not know in advance how many sockets we
* need. The main example of this are VST2 `dispatcher()` and `audioMaster()`
* calls. These functions can be called from multiple threads at the same time,
* so using a single socket with a mutex to prevent two threads from using the
* socket at the same time would cause issues. Luckily situation does not come
* up that often so to work around it, we'll do two things:
*
* - We'll keep a single long lived socket connection. This works the exact same
* way as every other `SocketHandler` socket. When we want to send data and
* the socket is primary socket is not currently being written to, we'll just
* use that. On the listening side we'll read from this in a loop.
* - On the listening side we also have a second thread asynchronously listening
* for new connections on the socket endpoint. When the sending side wants to
* send data and the primary socket is in use, it will instantiate a new
* connection to same socket endpoint and it will send the data over that
* socket instead. On the listening side the new connection will be accepted,
* and a newly spawned thread will handle incoming connection just like it
* would for the primary socket.
*
* @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
*/
template <typename Thread>
class AdHocSocketHandler {
protected:
/**
* Sets up a single primary socket. The sockets won't be active until
* `connect()` gets called.
*
* @param io_context The IO context the primary socket should be bound to. A
* new IO context will be created for accepting the additional incoming
* connections.
* @param endpoint The socket endpoint used for this event handler.
* @param listen If `true`, start listening on the sockets. Incoming
* connections will be accepted when `connect()` gets called. This should
* be set to `true` on the plugin side, and `false` on the Wine host side.
*
* @see Sockets::connect
*/
AdHocSocketHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint,
bool listen)
: io_context(io_context), endpoint(endpoint), socket(io_context) {
if (listen) {
boost::filesystem::create_directories(
boost::filesystem::path(endpoint.path()).parent_path());
acceptor.emplace(io_context, endpoint);
}
}
public:
/**
* Depending on the value of the `listen` argument passed to the
* constructor, either accept connections made to the sockets on the Linux
* side or connect to the sockets on the Wine side
*/
void connect() {
if (acceptor) {
acceptor->accept(socket);
// As mentioned in `acceptor's` docstring, this acceptor will be
// recreated in `receive_multi()` on another context, and
// potentially on the other side of the connection in the case
// where we're handling `vst_host_callback` VST2 events
acceptor.reset();
boost::filesystem::remove(endpoint.path());
} else {
socket.connect(endpoint);
}
}
/**
* Close the socket. Both sides that are actively listening will be thrown a
* `boost::system_error` when this happens.
*/
void close() {
// The shutdown can fail when the socket is already closed
boost::system::error_code err;
socket.shutdown(
boost::asio::local::stream_protocol::socket::shutdown_both, err);
socket.close();
}
protected:
/**
* Serialize and send an event over a socket. This is used for both the host
* -> plugin 'dispatch' events and the plugin -> host 'audioMaster' host
* callbacks since they follow the same format. See one of those functions
* for details on the parameters and return value of this function.
*
* As described above, if this function is currently being called from
* another thread, then this will create a new socket connection and send
* the event there instead.
*
* @param callback A function that will be called with a reference to a
* socket. This is either the primary `socket`, or a new ad hock socket if
* this function is currently being called from another thread.
*
* @tparam F A function in the form of
* `void(boost::asio::local::stream_protocol::socket&)`.
*/
template <typename F>
void send(F callback) {
// XXX: Maybe at some point we should benchmark how often this
// ad hoc socket spawning mechanism gets used. If some hosts
// for instance consistently and repeatedly trigger this then
// we might be able to do some optimizations there.
std::unique_lock lock(write_mutex, std::try_to_lock);
if (lock.owns_lock()) {
callback(socket);
} else {
try {
boost::asio::local::stream_protocol::socket secondary_socket(
io_context);
secondary_socket.connect(endpoint);
callback(secondary_socket);
} catch (const boost::system::system_error&) {
// So, what do we do when noone is listening on the endpoint
// yet? This can happen with plugin groups when the Wine host
// process does an `audioMaster()` call before the plugin is
// listening. If that happens we'll fall back to a synchronous
// request. This is not very pretty, so if anyone can think of a
// better way to structure all of this while still mainting a
// long living primary socket please let me know.
std::lock_guard lock(write_mutex);
callback(socket);
}
}
}
/**
* Spawn a new thread to listen for extra connections to `endpoint`, and
* then a blocking loop that handles incoming data from the primary
* `socket`.
*
* @param logging A pair containing a logger instance and whether or not
* this is for sending `dispatch()` events or host callbacks. Optional
* since it doesn't have to be done on both sides.
* @param primary_callback A function that will do a single read cycle for
* the primary socket socket that should do a single read cycle. This is
* called in a loop so it shouldn't do any looping itself.
* @param secondary_callback A function that will be called when we receive
* an incoming connection on a secondary socket. This would often do the
* same thing as `primary_callback`, but secondary sockets may need some
* different handling.
*
* TODO: Add an overload with a single callback
*
* @tparam F A function type in the form of
* `void(boost::asio::local::stream_protocol::socket&)`.
* @tparam G The same as `F`.
*/
template <typename F, typename G>
void receive_multi(std::optional<std::pair<Logger&, bool>> logging,
F primary_callback,
G secondary_callback) {
// As described above we'll handle incoming requests for `socket` on
// this thread. We'll also listen for incoming connections on `endpoint`
// on another thread. For any incoming connection we'll spawn a new
// thread to handle the request. When `socket` closes and this loop
// breaks, the listener and any still active threads will be cleaned up
// before this function exits.
boost::asio::io_context secondary_context{};
// The previous acceptor has already been shut down by
// `AdHocSocketHandler::connect()`
acceptor.emplace(secondary_context, endpoint);
// This works the exact same was as `active_plugins` and
// `next_plugin_id` in `GroupBridge`
std::map<size_t, Thread> active_secondary_requests{};
std::atomic_size_t next_request_id{};
std::mutex active_secondary_requests_mutex{};
accept_requests(
*acceptor, logging,
[&](boost::asio::local::stream_protocol::socket secondary_socket) {
const size_t request_id = next_request_id.fetch_add(1);
// We have to make sure to keep moving these sockets into the
// threads that will handle them
std::lock_guard lock(active_secondary_requests_mutex);
active_secondary_requests[request_id] = Thread(
[&, request_id](boost::asio::local::stream_protocol::socket
secondary_socket) {
secondary_callback(secondary_socket);
// When we have processed this request, we'll join the
// thread again with the thread that's handling
// `secondary_context`
boost::asio::post(secondary_context, [&, request_id]() {
std::lock_guard lock(
active_secondary_requests_mutex);
// The join is implicit because we're using
// `std::jthread`/`Win32Thread`
active_secondary_requests.erase(request_id);
});
},
std::move(secondary_socket));
});
Thread secondary_requests_handler([&]() { secondary_context.run(); });
// Now we'll handle reads on the primary socket in a loop until the
// socket shuts down
while (true) {
try {
primary_callback(socket);
} catch (const boost::system::system_error&) {
// This happens when the sockets got closed because the plugin
// is being shut down
break;
}
}
// After the primary socket gets terminated (during shutdown) we'll make
// sure all outstanding jobs have been processed and then drop all work
// from the IO context
std::lock_guard lock(active_secondary_requests_mutex);
secondary_context.stop();
acceptor.reset();
}
private:
/**
* Used in `receive_multi()` to asynchronously listen for secondary socket
* connections. After `callback()` returns this function will continue to be
* called until the IO context gets stopped.
*
* @param acceptor The acceptor we will be listening on.
* @param logging A pair containing a logger instance and whether or not
* this is for sending `dispatch()` events or host callbacks. Optional
* since it doesn't have to be done on both sides.
* @param callback A function that handles the new socket connection.
*
* @tparam F A function in the form
* `void(boost::asio::local::stream_protocol::socket)` to handle a new
* incoming connection.
*/
template <typename F>
void accept_requests(
boost::asio::local::stream_protocol::acceptor& acceptor,
std::optional<std::pair<Logger&, bool>> logging,
F callback) {
acceptor.async_accept(
[&, logging, callback](
const boost::system::error_code& error,
boost::asio::local::stream_protocol::socket secondary_socket) {
if (error.failed()) {
// On the Wine side it's expected that the primary socket
// connection will be dropped during shutdown, so we can
// silently ignore any related socket errors on the Wine
// side
if (logging) {
auto [logger, is_dispatch] = *logging;
logger.log("Failure while accepting connections: " +
error.message());
}
return;
}
callback(std::move(secondary_socket));
accept_requests(acceptor, logging, callback);
});
}
/**
* The main IO context. New sockets created during `send()` will be
* bound to this context. In `receive_multi()` we'll create a new IO context
* since we want to do all listening there on a dedicated thread.
*/
boost::asio::io_context& io_context;
boost::asio::local::stream_protocol::endpoint endpoint;
boost::asio::local::stream_protocol::socket socket;
/**
* This acceptor will be used once synchronously on the listening side
* during `Sockets::connect()`. When `AdHocSocketHandler::receive_multi()`
* is then called, we'll recreate the acceptor to asynchronously listen for
* new incoming socket connections on `endpoint` using. This is important,
* because on the case of `Vst2Sockets`'s' `vst_host_callback` the acceptor
* is first accepts an initial socket on the plugin side (like all sockets),
* but all additional incoming connections of course have to be listened for
* on the plugin side.
*/
std::optional<boost::asio::local::stream_protocol::acceptor> acceptor;
/**
* A mutex that locks the primary `socket`. If this is locked, then any new
* events will be sent over a new socket instead.
*/
std::mutex write_mutex;
};