diff --git a/src/common/communication.h b/src/common/communication.h index ea5a46ca..ade9eab2 100644 --- a/src/common/communication.h +++ b/src/common/communication.h @@ -143,6 +143,168 @@ inline T read_object(Socket& socket) { return read_object(socket, buffer); } +/** + * A single, long-living socket + */ +class SocketHandler { + public: + /** + * Sets up the sockets and start listening on the socket on the listening + * side. The sockets won't be active until `connect()` gets called. + * + * @param io_context The IO context the socket should be bound to. + * @param endpoint The endpoint this socket should connect to or listen on. + * @param listen If `true`, start listening on the sockets. Incoming + * connections will be accepted when `connect()` gets called. This should + * be set to `true` on the plugin side, and `false` on the Wine host side. + * + * @see Sockets::connect + */ + SocketHandler(boost::asio::io_context& io_context, + boost::asio::local::stream_protocol::endpoint endpoint, + bool listen) + : endpoint(endpoint), socket(io_context) { + if (listen) { + boost::filesystem::create_directories( + boost::filesystem::path(endpoint.path()).parent_path()); + acceptor.emplace(io_context, endpoint); + } + } + + /** + * Depending on the value of the `listen` argument passed to the + * constructor, either accept connections made to the sockets on the Linux + * side or connect to the sockets on the Wine side. + */ + void connect() { + if (acceptor) { + acceptor->accept(socket); + } else { + socket.connect(endpoint); + } + } + + /** + * Close the socket. Both sides that are actively listening will be thrown a + * `boost::system_error` when this happens. + */ + void close() { + // The shutdown can fail when the socket is already closed + boost::system::error_code err; + socket.shutdown( + boost::asio::local::stream_protocol::socket::shutdown_both, err); + socket.close(); + } + + /** + * Serialize an object and send it over the socket. + * + * @param object The object to send. + * @param buffer The buffer to use for the serialization. This is used to + * prevent excess allocations when sending audio. + * + * @throw boost::system::system_error If the socket is closed or gets closed + * during sending. + * + * @warning This operation is not atomic, and calling this function with the + * same socket from multiple threads at once will cause issues with the + * packets arriving out of order. + * + * @see write_object + */ + template + inline void send(const T& object, std::vector& buffer) { + write_object(socket, object, buffer); + } + + /** + * `SocketHandler::send()` with a small default buffer for convenience. + * + * @overload + */ + template + inline void send(const T& object) { + write_object(socket, object); + } + + /** + * Read a serialized object from the socket sent using `send()`. This will + * block until the object is available. + * + * @param buffer The buffer to read into. This is used to prevent excess + * allocations when sending audio. + * + * @return The deserialized object. + * + * @throw std::runtime_error If the conversion to an object was not + * successful. + * @throw boost::system::system_error If the socket is closed or gets closed + * while reading. + * + * @relates SocketHandler::send + * + * @see read_object + * @see SocketHandler::receive + */ + template + inline T receive_single(std::vector& buffer) { + return read_object(socket, buffer); + } + + /** + * `SocketHandler::receive_single()` with a small default buffer for + * convenience. + * + * @overload + */ + template + inline T receive_single() { + return read_object(socket); + } + + /** + * Start a blocking loop to receive objects on this socket. This function + * will return once the socket gets closed. + * + * @param callback A function that gets passed the received object. Since + * we'd probably want to do some more stuff after sending a reply, calling + * `send()` is the responsibility of this function. + * + * @tparam F A function type in the form of `void(T)` that does something + * with the object, and then calls `send()`. + * + * @relates SocketHandler::send + * + * @see read_object + * @see SocketHandler::receive_single + */ + template + void receive(F callback) { + std::vector buffer{}; + while (true) { + try { + auto object = receive_single(buffer); + + callback(std::move(object)); + } catch (const boost::system::system_error&) { + // This happens when the sockets got closed because the plugin + // is being shut down + break; + } + } + } + + private: + boost::asio::local::stream_protocol::endpoint endpoint; + boost::asio::local::stream_protocol::socket socket; + + /** + * Will be used in `connect()` on the listening side to establish the + * connection. + */ + std::optional acceptor; +}; + /** * Encodes the base behavior for reading from and writing to the `data` argument * for event dispatch functions. This provides base functionality for these @@ -227,6 +389,9 @@ class DefaultDataConverter { * * @tparam Thread The thread implementation to use. On the Linux side this * should be `std::jthread` and on the Wine side this should be `Win32Thread`. + * + * TODO: Factor the listening logic out to a `MultiSocketHandler`, and then + * recreate this on top of that */ template class EventHandler {