//===-- Generic RPC server interface --------------------------------------===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// #ifndef LLVM_LIBC_UTILS_GPU_LOADER_RPC_H #define LLVM_LIBC_UTILS_GPU_LOADER_RPC_H #include #include #include #include #include #include "src/__support/RPC/rpc.h" static __llvm_libc::rpc::Server server; /// Queries the RPC client at least once and performs server-side work if there /// are any active requests. template void handle_server(Alloc allocator, Dealloc deallocator) { using namespace __llvm_libc; // Continue servicing the client until there is no work left and we return. for (;;) { auto port = server.try_open(); if (!port) return; switch (port->get_opcode()) { case rpc::Opcode::WRITE_TO_STREAM: case rpc::Opcode::WRITE_TO_STDERR: case rpc::Opcode::WRITE_TO_STDOUT: { uint64_t sizes[rpc::MAX_LANE_SIZE] = {0}; void *strs[rpc::MAX_LANE_SIZE] = {nullptr}; FILE *files[rpc::MAX_LANE_SIZE] = {nullptr}; if (port->get_opcode() == rpc::Opcode::WRITE_TO_STREAM) port->recv([&](rpc::Buffer *buffer, uint32_t id) { files[id] = reinterpret_cast(buffer->data[0]); }); port->recv_n(strs, sizes, [&](uint64_t size) { return new char[size]; }); port->send([&](rpc::Buffer *buffer, uint32_t id) { FILE *file = port->get_opcode() == rpc::Opcode::WRITE_TO_STDOUT ? stdout : (port->get_opcode() == rpc::Opcode::WRITE_TO_STDERR ? stderr : files[id]); int ret = fwrite(strs[id], sizes[id], 1, file); reinterpret_cast(buffer->data)[0] = ret >= 0 ? sizes[id] : ret; }); for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) { if (strs[i]) delete[] reinterpret_cast(strs[i]); } break; } case rpc::Opcode::EXIT: { port->recv([](rpc::Buffer *buffer) { exit(reinterpret_cast(buffer->data)[0]); }); break; } case rpc::Opcode::MALLOC: { port->recv_and_send([&](rpc::Buffer *buffer) { buffer->data[0] = reinterpret_cast(allocator(buffer->data[0])); }); break; } case rpc::Opcode::FREE: { port->recv([&](rpc::Buffer *buffer) { deallocator(reinterpret_cast(buffer->data[0])); }); break; } case rpc::Opcode::TEST_INCREMENT: { port->recv_and_send([](rpc::Buffer *buffer) { reinterpret_cast(buffer->data)[0] += 1; }); break; } case rpc::Opcode::TEST_INTERFACE: { uint64_t cnt = 0; bool end_with_recv; port->recv([&](rpc::Buffer *buffer) { end_with_recv = buffer->data[0]; }); port->recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; }); port->send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); port->recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; }); port->send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); port->recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; }); port->recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; }); port->send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); port->send([&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); if (end_with_recv) port->recv([&](rpc::Buffer *buffer) { cnt = buffer->data[0]; }); else port->send( [&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); break; } case rpc::Opcode::TEST_STREAM: { uint64_t sizes[rpc::MAX_LANE_SIZE] = {0}; void *dst[rpc::MAX_LANE_SIZE] = {nullptr}; port->recv_n(dst, sizes, [](uint64_t size) { return new char[size]; }); port->send_n(dst, sizes); for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) { if (dst[i]) delete[] reinterpret_cast(dst[i]); } break; } default: port->recv([](rpc::Buffer *buffer) {}); } port->close(); } } #endif