Add EventHandler-like wrapper for simple sockets

This will greatly reduce boilerplate.
This commit is contained in:
Robbert van der Helm
2020-10-30 12:58:31 +01:00
parent 3788c1226b
commit 42792a883d
+165
View File
@@ -143,6 +143,168 @@ inline T read_object(Socket& socket) {
return read_object<T>(socket, buffer);
}
/**
* A single, long-living socket
*/
class SocketHandler {
public:
/**
* Sets up the sockets and start listening on the socket on the listening
* side. The sockets won't be active until `connect()` gets called.
*
* @param io_context The IO context the socket should be bound to.
* @param endpoint The endpoint this socket should connect to or listen on.
* @param listen If `true`, start listening on the sockets. Incoming
* connections will be accepted when `connect()` gets called. This should
* be set to `true` on the plugin side, and `false` on the Wine host side.
*
* @see Sockets::connect
*/
SocketHandler(boost::asio::io_context& io_context,
boost::asio::local::stream_protocol::endpoint endpoint,
bool listen)
: endpoint(endpoint), socket(io_context) {
if (listen) {
boost::filesystem::create_directories(
boost::filesystem::path(endpoint.path()).parent_path());
acceptor.emplace(io_context, endpoint);
}
}
/**
* Depending on the value of the `listen` argument passed to the
* constructor, either accept connections made to the sockets on the Linux
* side or connect to the sockets on the Wine side.
*/
void connect() {
if (acceptor) {
acceptor->accept(socket);
} else {
socket.connect(endpoint);
}
}
/**
* Close the socket. Both sides that are actively listening will be thrown a
* `boost::system_error` when this happens.
*/
void close() {
// The shutdown can fail when the socket is already closed
boost::system::error_code err;
socket.shutdown(
boost::asio::local::stream_protocol::socket::shutdown_both, err);
socket.close();
}
/**
* Serialize an object and send it over the socket.
*
* @param object The object to send.
* @param buffer The buffer to use for the serialization. This is used to
* prevent excess allocations when sending audio.
*
* @throw boost::system::system_error If the socket is closed or gets closed
* during sending.
*
* @warning This operation is not atomic, and calling this function with the
* same socket from multiple threads at once will cause issues with the
* packets arriving out of order.
*
* @see write_object
*/
template <typename T>
inline void send(const T& object, std::vector<uint8_t>& buffer) {
write_object(socket, object, buffer);
}
/**
* `SocketHandler::send()` with a small default buffer for convenience.
*
* @overload
*/
template <typename T>
inline void send(const T& object) {
write_object(socket, object);
}
/**
* Read a serialized object from the socket sent using `send()`. This will
* block until the object is available.
*
* @param buffer The buffer to read into. This is used to prevent excess
* allocations when sending audio.
*
* @return The deserialized object.
*
* @throw std::runtime_error If the conversion to an object was not
* successful.
* @throw boost::system::system_error If the socket is closed or gets closed
* while reading.
*
* @relates SocketHandler::send
*
* @see read_object
* @see SocketHandler::receive
*/
template <typename T>
inline T receive_single(std::vector<uint8_t>& buffer) {
return read_object<T>(socket, buffer);
}
/**
* `SocketHandler::receive_single()` with a small default buffer for
* convenience.
*
* @overload
*/
template <typename T>
inline T receive_single() {
return read_object<T>(socket);
}
/**
* Start a blocking loop to receive objects on this socket. This function
* will return once the socket gets closed.
*
* @param callback A function that gets passed the received object. Since
* we'd probably want to do some more stuff after sending a reply, calling
* `send()` is the responsibility of this function.
*
* @tparam F A function type in the form of `void(T)` that does something
* with the object, and then calls `send()`.
*
* @relates SocketHandler::send
*
* @see read_object
* @see SocketHandler::receive_single
*/
template <typename T, typename F>
void receive(F callback) {
std::vector<uint8_t> buffer{};
while (true) {
try {
auto object = receive_single<T>(buffer);
callback(std::move(object));
} catch (const boost::system::system_error&) {
// This happens when the sockets got closed because the plugin
// is being shut down
break;
}
}
}
private:
boost::asio::local::stream_protocol::endpoint endpoint;
boost::asio::local::stream_protocol::socket socket;
/**
* Will be used in `connect()` on the listening side to establish the
* connection.
*/
std::optional<boost::asio::local::stream_protocol::acceptor> acceptor;
};
/**
* Encodes the base behavior for reading from and writing to the `data` argument
* for event dispatch functions. This provides base functionality for these
@@ -227,6 +389,9 @@ class DefaultDataConverter {
*
* @tparam Thread The thread implementation to use. On the Linux side this
* should be `std::jthread` and on the Wine side this should be `Win32Thread`.
*
* TODO: Factor the listening logic out to a `MultiSocketHandler`, and then
* recreate this on top of that
*/
template <typename Thread>
class EventHandler {