Files
clang-p2996/llvm/lib/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.cpp
Lang Hames 060319a278 [ORC] Add a generic bootstrap key-value store to SimpleRemoteEPC.
SimpleRemoteEPC already included a "bootstrap symbols map" that could be used
to communicate the addresses of symbols needed for JIT bootstrap. The new
bootstrap map can be used to communicate arbitrary bootstrap values (encoded as
SPS buffers).

The bootstrap symbols map is kept as distinct becasue bootstrap symbols are
significant, and having a known value type for them allows for better debug
logging (we know how to render the values) and tooling (e.g. utils for adding
all bootstrap symbols to a JITDylib).
2023-04-02 10:27:06 -07:00

294 lines
10 KiB
C++

//===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
#include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/Process.h"
#include "llvm/TargetParser/Host.h"
#include "OrcRTBootstrap.h"
#define DEBUG_TYPE "orc"
using namespace llvm::orc::shared;
namespace llvm {
namespace orc {
ExecutorBootstrapService::~ExecutorBootstrapService() = default;
SimpleRemoteEPCServer::Dispatcher::~Dispatcher() = default;
#if LLVM_ENABLE_THREADS
void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
unique_function<void()> Work) {
{
std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!Running)
return;
++Outstanding;
}
std::thread([this, Work = std::move(Work)]() mutable {
Work();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
}).detach();
}
void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
std::unique_lock<std::mutex> Lock(DispatchMutex);
Running = false;
OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
}
#endif
StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
StringMap<ExecutorAddr> DBS;
rt_bootstrap::addTo(DBS);
return DBS;
}
Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
ExecutorAddr TagAddr,
SimpleRemoteEPCArgBytesVector ArgBytes) {
LLVM_DEBUG({
dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
switch (OpC) {
case SimpleRemoteEPCOpcode::Setup:
dbgs() << "Setup";
assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
assert(!TagAddr && "Non-zero TagAddr for Setup?");
break;
case SimpleRemoteEPCOpcode::Hangup:
dbgs() << "Hangup";
assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
assert(!TagAddr && "Non-zero TagAddr for Hangup?");
break;
case SimpleRemoteEPCOpcode::Result:
dbgs() << "Result";
assert(!TagAddr && "Non-zero TagAddr for Result?");
break;
case SimpleRemoteEPCOpcode::CallWrapper:
dbgs() << "CallWrapper";
break;
}
dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
<< ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
<< " bytes\n";
});
using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
return make_error<StringError>("Unexpected opcode",
inconvertibleErrorCode());
// TODO: Clean detach message?
switch (OpC) {
case SimpleRemoteEPCOpcode::Setup:
return make_error<StringError>("Unexpected Setup opcode",
inconvertibleErrorCode());
case SimpleRemoteEPCOpcode::Hangup:
return SimpleRemoteEPCTransportClient::EndSession;
case SimpleRemoteEPCOpcode::Result:
if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes)))
return std::move(Err);
break;
case SimpleRemoteEPCOpcode::CallWrapper:
handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes));
break;
}
return ContinueSession;
}
Error SimpleRemoteEPCServer::waitForDisconnect() {
std::unique_lock<std::mutex> Lock(ServerStateMutex);
ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; });
return std::move(ShutdownErr);
}
void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
PendingJITDispatchResultsMap TmpPending;
{
std::lock_guard<std::mutex> Lock(ServerStateMutex);
std::swap(TmpPending, PendingJITDispatchResults);
RunState = ServerShuttingDown;
}
// Send out-of-band errors to any waiting threads.
for (auto &KV : TmpPending)
KV.second->set_value(
shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
// Wait for dispatcher to clear.
D->shutdown();
// Shut down services.
while (!Services.empty()) {
ShutdownErr =
joinErrors(std::move(ShutdownErr), Services.back()->shutdown());
Services.pop_back();
}
std::lock_guard<std::mutex> Lock(ServerStateMutex);
ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err));
RunState = ServerShutDown;
ShutdownCV.notify_all();
}
Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC,
uint64_t SeqNo, ExecutorAddr TagAddr,
ArrayRef<char> ArgBytes) {
LLVM_DEBUG({
dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
switch (OpC) {
case SimpleRemoteEPCOpcode::Setup:
dbgs() << "Setup";
assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
assert(!TagAddr && "Non-zero TagAddr for Setup?");
break;
case SimpleRemoteEPCOpcode::Hangup:
dbgs() << "Hangup";
assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
assert(!TagAddr && "Non-zero TagAddr for Hangup?");
break;
case SimpleRemoteEPCOpcode::Result:
dbgs() << "Result";
assert(!TagAddr && "Non-zero TagAddr for Result?");
break;
case SimpleRemoteEPCOpcode::CallWrapper:
dbgs() << "CallWrapper";
break;
}
dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
<< ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
<< " bytes\n";
});
auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
LLVM_DEBUG({
if (Err)
dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
});
return Err;
}
Error SimpleRemoteEPCServer::sendSetupMessage(
StringMap<std::vector<char>> BootstrapMap,
StringMap<ExecutorAddr> BootstrapSymbols) {
using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
std::vector<char> SetupPacket;
SimpleRemoteEPCExecutorInfo EI;
EI.TargetTriple = sys::getProcessTriple();
if (auto PageSize = sys::Process::getPageSize())
EI.PageSize = *PageSize;
else
return PageSize.takeError();
EI.BootstrapMap = std::move(BootstrapMap);
EI.BootstrapSymbols = std::move(BootstrapSymbols);
assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
"Dispatch context name should not be set");
assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
"Dispatch function name should not be set");
EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this);
EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry);
using SPSSerialize =
shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
auto SetupPacketBytes =
shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI));
shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
if (!SPSSerialize::serialize(OB, EI))
return make_error<StringError>("Could not send setup packet",
inconvertibleErrorCode());
return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(),
{SetupPacketBytes.data(), SetupPacketBytes.size()});
}
Error SimpleRemoteEPCServer::handleResult(
uint64_t SeqNo, ExecutorAddr TagAddr,
SimpleRemoteEPCArgBytesVector ArgBytes) {
std::promise<shared::WrapperFunctionResult> *P = nullptr;
{
std::lock_guard<std::mutex> Lock(ServerStateMutex);
auto I = PendingJITDispatchResults.find(SeqNo);
if (I == PendingJITDispatchResults.end())
return make_error<StringError>("No call for sequence number " +
Twine(SeqNo),
inconvertibleErrorCode());
P = I->second;
PendingJITDispatchResults.erase(I);
releaseSeqNo(SeqNo);
}
auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size());
memcpy(R.data(), ArgBytes.data(), ArgBytes.size());
P->set_value(std::move(R));
return Error::success();
}
void SimpleRemoteEPCServer::handleCallWrapper(
uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
SimpleRemoteEPCArgBytesVector ArgBytes) {
D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
using WrapperFnTy =
shared::CWrapperFunctionResult (*)(const char *, size_t);
auto *Fn = TagAddr.toPtr<WrapperFnTy>();
shared::WrapperFunctionResult ResultBytes(
Fn(ArgBytes.data(), ArgBytes.size()));
if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo,
ExecutorAddr(),
{ResultBytes.data(), ResultBytes.size()}))
ReportError(std::move(Err));
});
}
shared::WrapperFunctionResult
SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
size_t ArgSize) {
uint64_t SeqNo;
std::promise<shared::WrapperFunctionResult> ResultP;
auto ResultF = ResultP.get_future();
{
std::lock_guard<std::mutex> Lock(ServerStateMutex);
if (RunState != ServerRunning)
return shared::WrapperFunctionResult::createOutOfBandError(
"jit_dispatch not available (EPC server shut down)");
SeqNo = getNextSeqNo();
assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
PendingJITDispatchResults[SeqNo] = &ResultP;
}
if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize}))
ReportError(std::move(Err));
return ResultF.get();
}
shared::CWrapperFunctionResult
SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
const char *ArgData, size_t ArgSize) {
return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
->doJITDispatch(FnTag, ArgData, ArgSize)
.release();
}
} // end namespace orc
} // end namespace llvm