diff --git a/.gitignore b/.gitignore index f704edeb8..9e11b5d75 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,5 @@ CMakeSettings.json # Windows global filetypes Thumbs.db +# Python files +*.pyc diff --git a/.gitmodules b/.gitmodules index 00704ec70..f79819a95 100644 --- a/.gitmodules +++ b/.gitmodules @@ -37,3 +37,9 @@ [submodule "discord-rpc"] path = externals/discord-rpc url = https://github.com/discordapp/discord-rpc.git +[submodule "externals/libzmq"] + path = externals/libzmq + url = https://github.com/zeromq/libzmq +[submodule "externals/cppzmq"] + path = externals/cppzmq + url = https://github.com/zeromq/cppzmq \ No newline at end of file diff --git a/.travis/common/post-upload.sh b/.travis/common/post-upload.sh index 28735a9cf..18162ba9a 100755 --- a/.travis/common/post-upload.sh +++ b/.travis/common/post-upload.sh @@ -4,6 +4,9 @@ cp license.txt "$REV_NAME" cp README.md "$REV_NAME" +# Copy cross-platform scripting support +cp -r dist/scripting "$REV_NAME" + tar $COMPRESSION_FLAGS "$ARCHIVE_NAME" "$REV_NAME" # Find out what release we are building diff --git a/dist/scripting/citra.py b/dist/scripting/citra.py new file mode 100644 index 000000000..c73acd681 --- /dev/null +++ b/dist/scripting/citra.py @@ -0,0 +1,94 @@ +import zmq +import struct +import random +import binascii + +CURRENT_REQUEST_VERSION = 1 +MAX_REQUEST_DATA_SIZE = 32 + +REQUEST_TYPE_READ_MEMORY = 1 +REQUEST_TYPE_WRITE_MEMORY = 2 + +CITRA_PORT = "45987" + +class Citra: + def __init__(self, address="127.0.0.1", port=CITRA_PORT): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect("tcp://" + address + ":" + port) + + def is_connected(self): + return self.socket is not None + + def _generate_header(self, request_type, data_size): + request_id = random.getrandbits(32) + return (struct.pack("IIII", CURRENT_REQUEST_VERSION, request_id, request_type, data_size), request_id) + + def _read_and_validate_header(self, raw_reply, expected_id, expected_type): + reply_version, reply_id, reply_type, reply_data_size = struct.unpack("IIII", raw_reply[:4*4]) + if (CURRENT_REQUEST_VERSION == reply_version and + expected_id == reply_id and + expected_type == reply_type and + reply_data_size == len(raw_reply[4*4:])): + return raw_reply[4*4:] + return None + + def read_memory(self, read_address, read_size): + """ + >>> c.read_memory(0x100000, 4) + b'\\x07\\x00\\x00\\xeb' + """ + result = bytes() + while read_size > 0: + temp_read_size = min(read_size, MAX_REQUEST_DATA_SIZE) + request_data = struct.pack("II", read_address, temp_read_size) + request, request_id = self._generate_header(REQUEST_TYPE_READ_MEMORY, len(request_data)) + request += request_data + self.socket.send(request) + + raw_reply = self.socket.recv() + reply_data = self._read_and_validate_header(raw_reply, request_id, REQUEST_TYPE_READ_MEMORY) + + if reply_data: + result += reply_data + read_size -= len(reply_data) + read_address += len(reply_data) + else: + return None + + return result + + def write_memory(self, write_address, write_contents): + """ + >>> c.write_memory(0x100000, b"\\xff\\xff\\xff\\xff") + True + >>> c.read_memory(0x100000, 4) + b'\\xff\\xff\\xff\\xff' + >>> c.write_memory(0x100000, b"\\x07\\x00\\x00\\xeb") + True + >>> c.read_memory(0x100000, 4) + b'\\x07\\x00\\x00\\xeb' + """ + write_size = len(write_contents) + while write_size > 0: + temp_write_size = min(write_size, MAX_REQUEST_DATA_SIZE - 8) + request_data = struct.pack("II", write_address, temp_write_size) + request_data += write_contents[:temp_write_size] + request, request_id = self._generate_header(REQUEST_TYPE_WRITE_MEMORY, len(request_data)) + request += request_data + self.socket.send(request) + + raw_reply = self.socket.recv() + reply_data = self._read_and_validate_header(raw_reply, request_id, REQUEST_TYPE_WRITE_MEMORY) + + if None != reply_data: + write_address += temp_write_size + write_size -= temp_write_size + write_contents = write_contents[temp_write_size:] + else: + return False + return True + +if "__main__" == __name__: + import doctest + doctest.testmod(extraglobs={'c': Citra()}) diff --git a/externals/CMakeLists.txt b/externals/CMakeLists.txt index 139454d69..8ea3c8b44 100644 --- a/externals/CMakeLists.txt +++ b/externals/CMakeLists.txt @@ -2,6 +2,7 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${PROJECT_SOURCE_DIR}/CMakeModules) include(DownloadExternals) +include(ExternalProject) # Catch add_library(catch-single-include INTERFACE) @@ -92,3 +93,96 @@ if (ENABLE_WEB_SERVICE) add_library(json-headers INTERFACE) target_include_directories(json-headers INTERFACE ./json) endif() + +# ZeroMQ +# libzmq includes its own clang-format target, which conflicts with the +# clang-format in Citra if libzmq is added as a subdirectory. An external +# project gets around this issue. Unfortunately, a lot of different +# configuration options are required for each different platform. An +# attempt was made to use CMake variables where possible, but some +# information necessarily had to be repeated. Hopefully there is not +# often a need to change anything. +if (MINGW) + if (${CMAKE_HOST_SYSTEM_NAME} STREQUAL "Windows") + set(LIBZMQ_MAKE mingw32-make) + set(LIBZMQ_COMPILER "") + set(LIBZMQ_TOOLCHAIN_FILE "") + else() + set(LIBZMQ_MAKE make) + set(LIBZMQ_COMPILER -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER};-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}) + set(LIBZMQ_TOOLCHAIN_FILE -DCMAKE_TOOLCHAIN_FILE=${PROJECT_SOURCE_DIR}/CMakeModules/MinGWCross.cmake) + endif() + ExternalProject_Add(libzmq-external + SOURCE_DIR ./libzmq + CMAKE_ARGS -DWITH_PERF_TOOL=OFF;-DZMQ_BUILD_TESTS=OFF;-DENABLE_CPACK=OFF;-DCMAKE_MAKE_PROGRAM=${LIBZMQ_MAKE};-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE};${LIBZMQ_TOOLCHAIN_FILE};${LIBZMQ_COMPILER} + BUILD_COMMAND cmake --build ${CMAKE_CURRENT_BINARY_DIR}/libzmq-external-prefix/src/libzmq-external-build --target libzmq-static --config ${CMAKE_BUILD_TYPE} + GIT_REPOSITORY https://github.com/zeromq/libzmq + GIT_TAG v4.2.5 + INSTALL_COMMAND "") +else() + if (MSVC) + set(LIBZMQ_COMPILER_FLAGS -DCMAKE_C_FLAGS=/GL-;-DCMAKE_CXX_FLAGS=/GL-) + else() + set(LIBZMQ_COMPILER_FLAGS "") + endif() + ExternalProject_Add(libzmq-external + SOURCE_DIR ./libzmq + CMAKE_ARGS -DCMAKE_MACOSX_RPATH=1;-DCMAKE_OSX_ARCHITECTURES=x86_64;-DWITH_PERF_TOOL=OFF;-DZMQ_BUILD_TESTS=OFF;-DENABLE_CPACK=OFF;-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE};${LIBZMQ_COMPILER_FLAGS} + BUILD_COMMAND cmake --build ${CMAKE_CURRENT_BINARY_DIR}/libzmq-external-prefix/src/libzmq-external-build --target libzmq-static --config ${CMAKE_BUILD_TYPE} + GIT_REPOSITORY https://github.com/zeromq/libzmq + GIT_TAG v4.2.5 + INSTALL_COMMAND "") +endif() +set(LIBZMQ_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzmq-external-prefix/src/libzmq-external-build/lib) +# On macOS, we need to build a fat static library containing both x86_64 and x86_64h, since macOS +# targets specify two architectures in their link command line ("-arch x86_64 -arch x86_64h"). +if (APPLE) + ExternalProject_Add(libzmq-external-h + SOURCE_DIR ./libzmq-h + CMAKE_ARGS -DCMAKE_MACOSX_RPATH=1;-DCMAKE_OSX_ARCHITECTURES=x86_64h;-DWITH_PERF_TOOL=OFF;-DZMQ_BUILD_TESTS=OFF;-DENABLE_CPACK=OFF;-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + BUILD_COMMAND cmake --build ${CMAKE_CURRENT_BINARY_DIR}/libzmq-external-h-prefix/src/libzmq-external-h-build --target libzmq-static --config ${CMAKE_BUILD_TYPE} + GIT_REPOSITORY https://github.com/zeromq/libzmq + GIT_TAG v4.2.5 + INSTALL_COMMAND "") + set(LIBZMQ_H_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzmq-external-h-prefix/src/libzmq-external-h-build/lib) + + add_library(libzmq-external-imported STATIC IMPORTED GLOBAL) + add_library(libzmq-external-imported-h STATIC IMPORTED GLOBAL) + add_dependencies(libzmq-external-imported libzmq-external) + add_dependencies(libzmq-external-imported-h libzmq-external-h) +else() + add_library(libzmq STATIC IMPORTED GLOBAL) + add_dependencies(libzmq libzmq-external) +endif() +# Set up the imported target properties +if (MSVC) + set_target_properties(libzmq PROPERTIES IMPORTED_LOCATION ${LIBZMQ_DIR}/${CMAKE_BUILD_TYPE}/libzmq-v141-mt-s-4_2_5${CMAKE_STATIC_LIBRARY_SUFFIX}) + set_target_properties(libzmq PROPERTIES IMPORTED_LINK_INTERFACE_LIBRARIES iphlpapi${CMAKE_STATIC_LIBRARY_SUFFIX}) +else() + if (APPLE) + set_target_properties(libzmq-external-imported PROPERTIES IMPORTED_LOCATION ${LIBZMQ_DIR}/libzmq${CMAKE_STATIC_LIBRARY_SUFFIX}) + set_target_properties(libzmq-external-imported-h PROPERTIES IMPORTED_LOCATION ${LIBZMQ_H_DIR}/libzmq${CMAKE_STATIC_LIBRARY_SUFFIX}) + else() + set_target_properties(libzmq PROPERTIES IMPORTED_LOCATION ${LIBZMQ_DIR}/libzmq${CMAKE_STATIC_LIBRARY_SUFFIX}) + if(MINGW) + set_target_properties(libzmq PROPERTIES IMPORTED_LINK_INTERFACE_LIBRARIES "ws2_32${CMAKE_STATIC_LIBRARY_SUFFIX};iphlpapi${CMAKE_STATIC_LIBRARY_SUFFIX}") + endif() + endif() +endif() +# On macOS, create the combined target +if (APPLE) + set(LIBZMQ_COMBINED_OUTPUT ${LIBZMQ_DIR}/libzmq_combined${CMAKE_STATIC_LIBRARY_SUFFIX}) + add_custom_target(libzmq-combined COMMAND lipo -create ${LIBZMQ_DIR}/libzmq${CMAKE_STATIC_LIBRARY_SUFFIX} ${LIBZMQ_H_DIR}/libzmq${CMAKE_STATIC_LIBRARY_SUFFIX} -o ${LIBZMQ_COMBINED_OUTPUT} + BYPRODUCTS ${LIBZMQ_COMBINED_OUTPUT}) + add_dependencies(libzmq-combined libzmq-external-imported libzmq-external-imported-h) + add_library(libzmq STATIC IMPORTED GLOBAL) + set_target_properties(libzmq PROPERTIES IMPORTED_LOCATION ${LIBZMQ_COMBINED_OUTPUT}) + add_dependencies(libzmq libzmq-combined) +endif() +# C interface to ZeroMQ +add_library(libzmq-headers INTERFACE) +target_include_directories(libzmq-headers INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/libzmq/include) +# C++ interface to ZeroMQ +add_library(cppzmq-headers INTERFACE) +target_include_directories(cppzmq-headers INTERFACE ./cppzmq) +add_dependencies(cppzmq-headers libzmq) diff --git a/externals/cppzmq b/externals/cppzmq new file mode 160000 index 000000000..6aa3ab686 --- /dev/null +++ b/externals/cppzmq @@ -0,0 +1 @@ +Subproject commit 6aa3ab686e916cb0e62df7fa7d12e0b13ae9fae6 diff --git a/externals/libzmq b/externals/libzmq new file mode 160000 index 000000000..d062edd8c --- /dev/null +++ b/externals/libzmq @@ -0,0 +1 @@ +Subproject commit d062edd8c142384792955796329baf1e5a3377cd diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index d9e9ac725..45f2bbac2 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -206,7 +206,8 @@ void FileBackend::Write(const Entry& entry) { CLS(Network) \ CLS(Movie) \ CLS(Loader) \ - CLS(WebService) + CLS(WebService) \ + CLS(RPC_Server) // GetClassName is a macro defined by Windows.h, grrr... const char* GetLogClassName(Class log_class) { diff --git a/src/common/logging/log.h b/src/common/logging/log.h index 6c0816abc..f8f70216f 100644 --- a/src/common/logging/log.h +++ b/src/common/logging/log.h @@ -98,6 +98,7 @@ enum class Class : ClassType { Network, ///< Network emulation Movie, ///< Movie (Input Recording) Playback WebService, ///< Interface to Citra Web Services + RPC_Server, ///< RPC server Count ///< Total number of logging classes }; diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index e291302dc..b1a14c782 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -410,6 +410,14 @@ add_library(core STATIC movie.h perf_stats.cpp perf_stats.h + rpc/packet.cpp + rpc/packet.h + rpc/rpc_server.cpp + rpc/rpc_server.h + rpc/server.cpp + rpc/server.h + rpc/zmq_server.cpp + rpc/zmq_server.h settings.cpp settings.h telemetry_session.cpp @@ -436,3 +444,5 @@ if (ARCHITECTURE_x86_64) ) target_link_libraries(core PRIVATE dynarmic) endif() + +target_link_libraries(core PUBLIC libzmq-headers cppzmq-headers libzmq) diff --git a/src/core/core.cpp b/src/core/core.cpp index 0ce5fcb8b..bd8be21c9 100644 --- a/src/core/core.cpp +++ b/src/core/core.cpp @@ -25,6 +25,7 @@ #include "core/loader/loader.h" #include "core/memory_setup.h" #include "core/movie.h" +#include "core/rpc/rpc_server.h" #include "core/settings.h" #include "network/network.h" #include "video_core/video_core.h" @@ -173,6 +174,7 @@ System::ResultStatus System::Init(EmuWindow& emu_window, u32 system_mode) { dsp_core->EnableStretching(Settings::values.enable_audio_stretching); telemetry_session = std::make_unique(); + rpc_server = std::make_unique(); service_manager = std::make_shared(); shared_page_handler = std::make_shared(); @@ -224,6 +226,7 @@ void System::Shutdown() { Kernel::Shutdown(); HW::Shutdown(); telemetry_session.reset(); + rpc_server.reset(); service_manager.reset(); dsp_core.reset(); cpu_core.reset(); diff --git a/src/core/core.h b/src/core/core.h index b4f3408f3..3477865fb 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -21,6 +21,10 @@ namespace AudioCore { class DspInterface; } +namespace RPC { +class RPCServer; +} + namespace Service { namespace SM { class ServiceManager; @@ -202,6 +206,9 @@ private: /// Frontend applets std::shared_ptr registered_swkbd; + /// RPC Server for scripting support + std::unique_ptr rpc_server; + /// Shared Page std::shared_ptr shared_page_handler; diff --git a/src/core/rpc/packet.cpp b/src/core/rpc/packet.cpp new file mode 100644 index 000000000..8ad0d0524 --- /dev/null +++ b/src/core/rpc/packet.cpp @@ -0,0 +1,15 @@ +#include +#include + +#include "core/rpc/packet.h" + +namespace RPC { + +Packet::Packet(const PacketHeader& header, u8* data, + std::function send_reply_callback) + : header(header), send_reply_callback(std::move(send_reply_callback)) { + + std::memcpy(packet_data.data(), data, std::min(header.packet_size, MAX_PACKET_DATA_SIZE)); +} + +}; // namespace RPC diff --git a/src/core/rpc/packet.h b/src/core/rpc/packet.h new file mode 100644 index 000000000..8a2491d63 --- /dev/null +++ b/src/core/rpc/packet.h @@ -0,0 +1,78 @@ +// Copyright 2018 Citra Emulator Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#include "common/common_types.h" + +namespace RPC { + +enum class PacketType { + Undefined = 0, + ReadMemory, + WriteMemory, +}; + +struct PacketHeader { + u32 version; + u32 id; + PacketType packet_type; + u32 packet_size; +}; + +constexpr u32 CURRENT_VERSION = 1; +constexpr u32 MIN_PACKET_SIZE = sizeof(PacketHeader); +constexpr u32 MAX_PACKET_DATA_SIZE = 32; +constexpr u32 MAX_PACKET_SIZE = MIN_PACKET_SIZE + MAX_PACKET_DATA_SIZE; +constexpr u32 MAX_READ_SIZE = MAX_PACKET_DATA_SIZE; + +class Packet { +public: + Packet(const PacketHeader& header, u8* data, std::function send_reply_callback); + + u32 GetVersion() const { + return header.version; + } + + u32 GetId() const { + return header.id; + } + + PacketType GetPacketType() const { + return header.packet_type; + } + + u32 GetPacketDataSize() const { + return header.packet_size; + } + + const PacketHeader& GetHeader() const { + return header; + } + + std::array& GetPacketData() { + return packet_data; + } + + void SetPacketDataSize(u32 size) { + header.packet_size = size; + } + + void SendReply() { + send_reply_callback(*this); + } + +private: + void HandleReadMemory(u32 address, u32 data_size); + void HandleWriteMemory(u32 address, const u8* data, u32 data_size); + + struct PacketHeader header; + std::array packet_data; + + std::function send_reply_callback; +}; + +} // namespace RPC diff --git a/src/core/rpc/rpc_server.cpp b/src/core/rpc/rpc_server.cpp new file mode 100644 index 000000000..0a3b046ec --- /dev/null +++ b/src/core/rpc/rpc_server.cpp @@ -0,0 +1,139 @@ +#include "common/logging/log.h" +#include "core/arm/arm_interface.h" +#include "core/core.h" +#include "core/memory.h" +#include "core/rpc/packet.h" +#include "core/rpc/rpc_server.h" + +namespace RPC { + +RPCServer::RPCServer() : server(*this) { + LOG_INFO(RPC_Server, "Starting RPC server ..."); + + Start(); + + LOG_INFO(RPC_Server, "RPC started."); +} + +RPCServer::~RPCServer() { + LOG_INFO(RPC_Server, "Stopping RPC ..."); + + Stop(); + + LOG_INFO(RPC_Server, "RPC stopped."); +} + +void RPCServer::HandleReadMemory(Packet& packet, u32 address, u32 data_size) { + if (data_size > MAX_READ_SIZE) { + return; + } + + // Note: Memory read occurs asynchronously from the state of the emulator + Memory::ReadBlock(address, packet.GetPacketData().data(), data_size); + packet.SetPacketDataSize(data_size); + packet.SendReply(); +} + +void RPCServer::HandleWriteMemory(Packet& packet, u32 address, const u8* data, u32 data_size) { + // Only allow writing to certain memory regions + if ((address >= Memory::PROCESS_IMAGE_VADDR && address <= Memory::PROCESS_IMAGE_VADDR_END) || + (address >= Memory::HEAP_VADDR && address <= Memory::HEAP_VADDR_END) || + (address >= Memory::N3DS_EXTRA_RAM_VADDR && address <= Memory::N3DS_EXTRA_RAM_VADDR_END)) { + // Note: Memory write occurs asynchronously from the state of the emulator + Memory::WriteBlock(address, data, data_size); + // If the memory happens to be executable code, make sure the changes become visible + Core::CPU().InvalidateCacheRange(address, data_size); + } + packet.SetPacketDataSize(0); + packet.SendReply(); +} + +bool RPCServer::ValidatePacket(const PacketHeader& packet_header) { + if (packet_header.version <= CURRENT_VERSION) { + switch (packet_header.packet_type) { + case PacketType::ReadMemory: + case PacketType::WriteMemory: + if (packet_header.packet_size >= (sizeof(u32) * 2)) { + return true; + } + break; + default: + break; + } + } + return false; +} + +void RPCServer::HandleSingleRequest(std::unique_ptr request_packet) { + bool success = false; + + if (ValidatePacket(request_packet->GetHeader())) { + // Currently, all request types use the address/data_size wire format + u32 address = 0; + u32 data_size = 0; + std::memcpy(&address, request_packet->GetPacketData().data(), sizeof(address)); + std::memcpy(&data_size, request_packet->GetPacketData().data() + sizeof(address), + sizeof(data_size)); + + switch (request_packet->GetPacketType()) { + case PacketType::ReadMemory: + if (data_size > 0 && data_size <= MAX_READ_SIZE) { + HandleReadMemory(*request_packet, address, data_size); + success = true; + } + break; + case PacketType::WriteMemory: + if (data_size > 0 && data_size <= MAX_PACKET_DATA_SIZE - (sizeof(u32) * 2)) { + const u8* data = request_packet->GetPacketData().data() + (sizeof(u32) * 2); + HandleWriteMemory(*request_packet, address, data, data_size); + success = true; + } + break; + default: + break; + } + } + + if (!success) { + // Send an empty reply, so as not to hang the client + request_packet->SetPacketDataSize(0); + request_packet->SendReply(); + } +} + +void RPCServer::HandleRequestsLoop() { + std::unique_ptr request_packet; + + LOG_INFO(RPC_Server, "Request handler started."); + + while (true) { + std::unique_lock lock(request_queue_mutex); + request_queue_cv.wait(lock, [&] { return !running || request_queue.Pop(request_packet); }); + if (!running) { + break; + } + HandleSingleRequest(std::move(request_packet)); + } +} + +void RPCServer::QueueRequest(std::unique_ptr request) { + std::unique_lock lock(request_queue_mutex); + request_queue.Push(std::move(request)); + request_queue_cv.notify_one(); +} + +void RPCServer::Start() { + running = true; + const auto threadFunction = [this]() { HandleRequestsLoop(); }; + request_handler_thread = std::thread(threadFunction); + server.Start(); +} + +void RPCServer::Stop() { + running = false; + request_queue_cv.notify_one(); + request_handler_thread.join(); + server.Stop(); +} + +}; // namespace RPC diff --git a/src/core/rpc/rpc_server.h b/src/core/rpc/rpc_server.h new file mode 100644 index 000000000..62fdb739c --- /dev/null +++ b/src/core/rpc/rpc_server.h @@ -0,0 +1,40 @@ +// Copyright 2018 Citra Emulator Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#include +#include +#include "common/threadsafe_queue.h" +#include "core/rpc/server.h" + +namespace RPC { + +class RPCServer { +public: + RPCServer(); + ~RPCServer(); + + void QueueRequest(std::unique_ptr request); + +private: + void Start(); + void Stop(); + void HandleReadMemory(Packet& packet, u32 address, u32 data_size); + void HandleWriteMemory(Packet& packet, u32 address, const u8* data, u32 data_size); + bool ValidatePacket(const PacketHeader& packet_header); + void HandleSingleRequest(std::unique_ptr request); + void HandleRequestsLoop(); + + Server server; + Common::SPSCQueue> request_queue; + bool running = false; + std::thread request_handler_thread; + std::mutex request_queue_mutex; + std::condition_variable request_queue_cv; +}; + +} // namespace RPC diff --git a/src/core/rpc/server.cpp b/src/core/rpc/server.cpp new file mode 100644 index 000000000..950881e9b --- /dev/null +++ b/src/core/rpc/server.cpp @@ -0,0 +1,35 @@ +#include + +#include "common/threadsafe_queue.h" +#include "core/core.h" +#include "core/rpc/rpc_server.h" +#include "core/rpc/server.h" + +namespace RPC { + +Server::Server(RPCServer& rpc_server) : rpc_server(rpc_server) {} + +void Server::Start() { + const auto callback = [this](std::unique_ptr new_request) { + NewRequestCallback(std::move(new_request)); + }; + + try { + zmq_server = std::make_unique(callback); + } catch (...) { + LOG_ERROR(RPC_Server, "Error starting ZeroMQ server"); + } +} + +void Server::Stop() { + zmq_server.reset(); +} + +void Server::NewRequestCallback(std::unique_ptr new_request) { + LOG_INFO(RPC_Server, "Received request version={} id={} type={} size={}", + new_request->GetVersion(), new_request->GetId(), + static_cast(new_request->GetPacketType()), new_request->GetPacketDataSize()); + rpc_server.QueueRequest(std::move(new_request)); +} + +}; // namespace RPC diff --git a/src/core/rpc/server.h b/src/core/rpc/server.h new file mode 100644 index 000000000..2dfad2ef7 --- /dev/null +++ b/src/core/rpc/server.h @@ -0,0 +1,27 @@ +// Copyright 2018 Citra Emulator Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include "core/rpc/packet.h" +#include "core/rpc/zmq_server.h" + +namespace RPC { + +class RPCServer; +class ZMQServer; + +class Server { +public: + Server(RPCServer& rpc_server); + void Start(); + void Stop(); + void NewRequestCallback(std::unique_ptr new_request); + +private: + RPCServer& rpc_server; + std::unique_ptr zmq_server; +}; + +} // namespace RPC diff --git a/src/core/rpc/zmq_server.cpp b/src/core/rpc/zmq_server.cpp new file mode 100644 index 000000000..4825108d7 --- /dev/null +++ b/src/core/rpc/zmq_server.cpp @@ -0,0 +1,78 @@ +#include "common/common_types.h" +#include "core/core.h" +#include "core/rpc/packet.h" +#include "core/rpc/zmq_server.h" + +namespace RPC { + +ZMQServer::ZMQServer(std::function)> new_request_callback) + : zmq_context(std::move(std::make_unique(1))), + zmq_socket(std::move(std::make_unique(*zmq_context, ZMQ_REP))), + new_request_callback(std::move(new_request_callback)) { + // Use a random high port + // TODO: Make configurable or increment port number on failure + zmq_socket->bind("tcp://127.0.0.1:45987"); + LOG_INFO(RPC_Server, "ZeroMQ listening on port 45987"); + + worker_thread = std::thread(&ZMQServer::WorkerLoop, this); +} + +ZMQServer::~ZMQServer() { + // Triggering the zmq_context destructor will cancel + // any blocking calls to zmq_socket->recv() + running = false; + zmq_context.reset(); + worker_thread.join(); + + LOG_INFO(RPC_Server, "ZeroMQ stopped"); +} + +void ZMQServer::WorkerLoop() { + zmq::message_t request; + while (running) { + try { + if (zmq_socket->recv(&request, 0)) { + if (request.size() >= MIN_PACKET_SIZE && request.size() <= MAX_PACKET_SIZE) { + u8* request_buffer = static_cast(request.data()); + PacketHeader header; + std::memcpy(&header, request_buffer, sizeof(header)); + if ((request.size() - MIN_PACKET_SIZE) == header.packet_size) { + u8* data = request_buffer + MIN_PACKET_SIZE; + std::function send_reply_callback = + std::bind(&ZMQServer::SendReply, this, std::placeholders::_1); + std::unique_ptr new_packet = + std::make_unique(header, data, send_reply_callback); + + // Send the request to the upper layer for handling + new_request_callback(std::move(new_packet)); + } + } + } + } catch (...) { + LOG_WARNING(RPC_Server, "Failed to receive data on ZeroMQ socket"); + } + } + + // Destroying the socket must be done by this thread. + zmq_socket.reset(); +} + +void ZMQServer::SendReply(Packet& reply_packet) { + if (running) { + auto reply_buffer = + std::make_unique(MIN_PACKET_SIZE + reply_packet.GetPacketDataSize()); + auto reply_header = reply_packet.GetHeader(); + + std::memcpy(reply_buffer.get(), &reply_header, sizeof(reply_header)); + std::memcpy(reply_buffer.get() + (4 * sizeof(u32)), reply_packet.GetPacketData().data(), + reply_packet.GetPacketDataSize()); + + zmq_socket->send(reply_buffer.get(), MIN_PACKET_SIZE + reply_packet.GetPacketDataSize()); + + LOG_INFO(RPC_Server, "Sent reply version({}) id=({}) type=({}) size=({})", + reply_packet.GetVersion(), reply_packet.GetId(), + static_cast(reply_packet.GetPacketType()), reply_packet.GetPacketDataSize()); + } +} + +}; // namespace RPC diff --git a/src/core/rpc/zmq_server.h b/src/core/rpc/zmq_server.h new file mode 100644 index 000000000..784fccf5a --- /dev/null +++ b/src/core/rpc/zmq_server.h @@ -0,0 +1,34 @@ +// Copyright 2018 Citra Emulator Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#define ZMQ_STATIC +#include + +namespace RPC { + +class Packet; + +class ZMQServer { +public: + explicit ZMQServer(std::function)> new_request_callback); + ~ZMQServer(); + +private: + void WorkerLoop(); + void SendReply(Packet& request); + + std::thread worker_thread; + std::atomic_bool running = true; + + std::unique_ptr zmq_context; + std::unique_ptr zmq_socket; + + std::function)> new_request_callback; +}; + +} // namespace RPC