Re-apply "[ORC] Unify task dispatch across ExecutionSession and..." with fix.

This re-applies 6094b3b7db, which was reverted in a28557aadd due to broken
bots. As far as I can tell all failures were due to a missing #include <deque>,
which has been adedd in this commit.
This commit is contained in:
Lang Hames
2024-04-22 15:44:07 -08:00
parent dc3f94384d
commit 1effa19de2
11 changed files with 158 additions and 61 deletions

View File

@@ -1443,9 +1443,6 @@ public:
/// Send a result to the remote.
using SendResultFunction = unique_function<void(shared::WrapperFunctionResult)>;
/// For dispatching ORC tasks (typically materialization tasks).
using DispatchTaskFunction = unique_function<void(std::unique_ptr<Task> T)>;
/// An asynchronous wrapper-function callable from the executor via
/// jit-dispatch.
using JITDispatchHandlerFunction = unique_function<void(
@@ -1568,12 +1565,6 @@ public:
/// Unhandled errors can be sent here to log them.
void reportError(Error Err) { ReportError(std::move(Err)); }
/// Set the task dispatch function.
ExecutionSession &setDispatchTask(DispatchTaskFunction DispatchTask) {
this->DispatchTask = std::move(DispatchTask);
return *this;
}
/// Search the given JITDylibs to find the flags associated with each of the
/// given symbols.
void lookupFlags(LookupKind K, JITDylibSearchOrder SearchOrder,
@@ -1648,7 +1639,7 @@ public:
void dispatchTask(std::unique_ptr<Task> T) {
assert(T && "T must be non-null");
DEBUG_WITH_TYPE("orc", dumpDispatchInfo(*T));
DispatchTask(std::move(T));
EPC->getDispatcher().dispatch(std::move(T));
}
/// Run a wrapper function in the executor.
@@ -1762,8 +1753,6 @@ private:
logAllUnhandledErrors(std::move(Err), errs(), "JIT session error: ");
}
static void runOnCurrentThread(std::unique_ptr<Task> T) { T->run(); }
void dispatchOutstandingMUs();
static std::unique_ptr<MaterializationResponsibility>
@@ -1869,7 +1858,6 @@ private:
std::unique_ptr<ExecutorProcessControl> EPC;
std::unique_ptr<Platform> P;
ErrorReporter ReportError = logErrorsToStdErr;
DispatchTaskFunction DispatchTask = runOnCurrentThread;
std::vector<ResourceManager *> ResourceManagers;

View File

@@ -254,7 +254,6 @@ protected:
DataLayout DL;
Triple TT;
std::unique_ptr<DefaultThreadPool> CompileThreads;
std::unique_ptr<ObjectLayer> ObjLinkingLayer;
std::unique_ptr<ObjectTransformLayer> ObjTransformLayer;
@@ -325,6 +324,7 @@ public:
PlatformSetupFunction SetUpPlatform;
NotifyCreatedFunction NotifyCreated;
unsigned NumCompileThreads = 0;
std::optional<bool> SupportConcurrentCompilation;
/// Called prior to JIT class construcion to fix up defaults.
Error prepareForConstruction();
@@ -333,7 +333,7 @@ public:
template <typename JITType, typename SetterImpl, typename State>
class LLJITBuilderSetters {
public:
/// Set a ExecutorProcessControl for this instance.
/// Set an ExecutorProcessControl for this instance.
/// This should not be called if ExecutionSession has already been set.
SetterImpl &
setExecutorProcessControl(std::unique_ptr<ExecutorProcessControl> EPC) {
@@ -462,19 +462,26 @@ public:
///
/// If this method is not called, behavior will be as if it were called with
/// a zero argument.
///
/// This setting should not be used if a custom ExecutionSession or
/// ExecutorProcessControl object is set: in those cases a custom
/// TaskDispatcher should be used instead.
SetterImpl &setNumCompileThreads(unsigned NumCompileThreads) {
impl().NumCompileThreads = NumCompileThreads;
return impl();
}
/// Set an ExecutorProcessControl object.
/// If set, this forces LLJIT concurrent compilation support to be either on
/// or off. This controls the selection of compile function (concurrent vs
/// single threaded) and whether or not sub-modules are cloned to new
/// contexts for lazy emission.
///
/// If the platform uses ObjectLinkingLayer by default and no
/// ObjectLinkingLayerCreator has been set then the ExecutorProcessControl
/// object will be used to supply the memory manager for the
/// ObjectLinkingLayer.
SetterImpl &setExecutorProcessControl(ExecutorProcessControl &EPC) {
impl().EPC = &EPC;
/// If not explicitly set then concurrency support will be turned on if
/// NumCompileThreads is set to a non-zero value, or if a custom
/// ExecutionSession or ExecutorProcessControl instance is provided.
SetterImpl &setSupportConcurrentCompilation(
std::optional<bool> SupportConcurrentCompilation) {
impl().SupportConcurrentCompilation = SupportConcurrentCompilation;
return impl();
}

View File

@@ -23,6 +23,7 @@
#if LLVM_ENABLE_THREADS
#include <condition_variable>
#include <deque>
#include <mutex>
#include <thread>
#endif
@@ -114,6 +115,9 @@ public:
class DynamicThreadPoolTaskDispatcher : public TaskDispatcher {
public:
DynamicThreadPoolTaskDispatcher(
std::optional<size_t> MaxMaterializationThreads)
: MaxMaterializationThreads(MaxMaterializationThreads) {}
void dispatch(std::unique_ptr<Task> T) override;
void shutdown() override;
private:
@@ -121,6 +125,10 @@ private:
bool Running = true;
size_t Outstanding = 0;
std::condition_variable OutstandingCV;
std::optional<size_t> MaxMaterializationThreads;
size_t NumMaterializationThreads = 0;
std::deque<std::unique_ptr<Task>> MaterializationTaskQueue;
};
#endif // LLVM_ENABLE_THREADS

View File

@@ -63,7 +63,7 @@ SelfExecutorProcessControl::Create(
if (!D) {
#if LLVM_ENABLE_THREADS
D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt);
#else
D = std::make_unique<InPlaceTaskDispatcher>();
#endif

View File

@@ -667,6 +667,37 @@ Error LLJITBuilderState::prepareForConstruction() {
return JTMBOrErr.takeError();
}
if ((ES || EPC) && NumCompileThreads)
return make_error<StringError>(
"NumCompileThreads cannot be used with a custom ExecutionSession or "
"ExecutorProcessControl",
inconvertibleErrorCode());
#if !LLVM_ENABLE_THREADS
if (NumCompileThreads)
return make_error<StringError>(
"LLJIT num-compile-threads is " + Twine(NumCompileThreads) +
" but LLVM was compiled with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS
bool ConcurrentCompilationSettingDefaulted = !SupportConcurrentCompilation;
if (!SupportConcurrentCompilation) {
#if LLVM_ENABLE_THREADS
SupportConcurrentCompilation = NumCompileThreads || ES || EPC;
#else
SupportConcurrentCompilation = false;
#endif // LLVM_ENABLE_THREADS
} else {
#if !LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation)
return make_error<StringError>(
"LLJIT concurrent compilation support requested, but LLVM was built "
"with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS
}
LLVM_DEBUG({
dbgs() << " JITTargetMachineBuilder is "
<< JITTargetMachineBuilderPrinter(*JTMB, " ")
@@ -684,11 +715,13 @@ Error LLJITBuilderState::prepareForConstruction() {
<< (CreateCompileFunction ? "Yes" : "No") << "\n"
<< " Custom platform-setup function: "
<< (SetUpPlatform ? "Yes" : "No") << "\n"
<< " Number of compile threads: " << NumCompileThreads;
if (!NumCompileThreads)
dbgs() << " (code will be compiled on the execution thread)\n";
<< " Support concurrent compilation: "
<< (*SupportConcurrentCompilation ? "Yes" : "No");
if (ConcurrentCompilationSettingDefaulted)
dbgs() << " (defaulted based on ES / EPC)\n";
else
dbgs() << "\n";
dbgs() << " Number of compile threads: " << NumCompileThreads << "\n";
});
// Create DL if not specified.
@@ -705,7 +738,19 @@ Error LLJITBuilderState::prepareForConstruction() {
dbgs() << "ExecutorProcessControl not specified, "
"Creating SelfExecutorProcessControl instance\n";
});
if (auto EPCOrErr = SelfExecutorProcessControl::Create())
std::unique_ptr<TaskDispatcher> D = nullptr;
#if LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation) {
std::optional<size_t> NumThreads = std ::nullopt;
if (NumCompileThreads)
NumThreads = NumCompileThreads;
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(NumThreads);
} else
D = std::make_unique<InPlaceTaskDispatcher>();
#endif // LLVM_ENABLE_THREADS
if (auto EPCOrErr =
SelfExecutorProcessControl::Create(nullptr, std::move(D), nullptr))
EPC = std::move(*EPCOrErr);
else
return EPCOrErr.takeError();
@@ -790,8 +835,6 @@ Error LLJITBuilderState::prepareForConstruction() {
}
LLJIT::~LLJIT() {
if (CompileThreads)
CompileThreads->wait();
if (auto Err = ES->endSession())
ES->reportError(std::move(Err));
}
@@ -916,9 +959,8 @@ LLJIT::createCompileFunction(LLJITBuilderState &S,
if (S.CreateCompileFunction)
return S.CreateCompileFunction(std::move(JTMB));
// Otherwise default to creating a SimpleCompiler, or ConcurrentIRCompiler,
// depending on the number of threads requested.
if (S.NumCompileThreads > 0)
// If using a custom EPC then use a ConcurrentIRCompiler by default.
if (*S.SupportConcurrentCompilation)
return std::make_unique<ConcurrentIRCompiler>(std::move(JTMB));
auto TM = JTMB.createTargetMachine();
@@ -970,21 +1012,8 @@ LLJIT::LLJIT(LLJITBuilderState &S, Error &Err)
std::make_unique<IRTransformLayer>(*ES, *TransformLayer);
}
if (S.NumCompileThreads > 0) {
if (*S.SupportConcurrentCompilation)
InitHelperTransformLayer->setCloneToNewContextOnEmit(true);
CompileThreads = std::make_unique<DefaultThreadPool>(
hardware_concurrency(S.NumCompileThreads));
ES->setDispatchTask([this](std::unique_ptr<Task> T) {
// FIXME: We should be able to use move-capture here, but ThreadPool's
// AsyncTaskTys are std::functions rather than unique_functions
// (because MSVC's std::packaged_tasks don't support move-only types).
// Fix this when all the above gets sorted out.
CompileThreads->async([UnownedT = T.release()]() mutable {
std::unique_ptr<Task> T(UnownedT);
T->run();
});
});
}
if (S.SetupProcessSymbolsJITDylib) {
if (auto ProcSymsJD = S.SetupProcessSymbolsJITDylib(*this)) {
@@ -1240,7 +1269,7 @@ LLLazyJIT::LLLazyJIT(LLLazyJITBuilderState &S, Error &Err) : LLJIT(S, Err) {
CODLayer = std::make_unique<CompileOnDemandLayer>(
*ES, *InitHelperTransformLayer, *LCTMgr, std::move(ISMBuilder));
if (S.NumCompileThreads > 0)
if (*S.SupportConcurrentCompilation)
CODLayer->setCloneToNewContextOnEmit(true);
}

View File

@@ -7,6 +7,7 @@
//===----------------------------------------------------------------------===//
#include "llvm/ExecutionEngine/Orc/TaskDispatch.h"
#include "llvm/ExecutionEngine/Orc/Core.h"
namespace llvm {
namespace orc {
@@ -24,16 +25,52 @@ void InPlaceTaskDispatcher::shutdown() {}
#if LLVM_ENABLE_THREADS
void DynamicThreadPoolTaskDispatcher::dispatch(std::unique_ptr<Task> T) {
bool IsMaterializationTask = isa<MaterializationTask>(*T);
{
std::lock_guard<std::mutex> Lock(DispatchMutex);
if (IsMaterializationTask) {
// If this is a materialization task and there are too many running
// already then queue this one up and return early.
if (MaxMaterializationThreads &&
NumMaterializationThreads == *MaxMaterializationThreads) {
MaterializationTaskQueue.push_back(std::move(T));
return;
}
// Otherwise record that we have a materialization task running.
++NumMaterializationThreads;
}
++Outstanding;
}
std::thread([this, T = std::move(T)]() mutable {
T->run();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
std::thread([this, T = std::move(T), IsMaterializationTask]() mutable {
while (true) {
// Run the task.
T->run();
std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!MaterializationTaskQueue.empty()) {
// If there are any materialization tasks running then steal that work.
T = std::move(MaterializationTaskQueue.front());
MaterializationTaskQueue.pop_front();
if (!IsMaterializationTask) {
++NumMaterializationThreads;
IsMaterializationTask = true;
}
} else {
// Otherwise decrement work counters.
if (IsMaterializationTask)
--NumMaterializationThreads;
--Outstanding;
OutstandingCV.notify_all();
return;
}
}
}).detach();
}

View File

@@ -807,8 +807,8 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> launchExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;
return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(), std::move(S),
FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::move(S), FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
#endif
}
@@ -897,7 +897,7 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> connectToExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;
return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(),
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::move(S), *SockFD, *SockFD);
#endif
}

View File

@@ -1005,11 +1005,11 @@ TEST_F(CoreAPIsStandardTest, RedefineBoundWeakSymbol) {
TEST_F(CoreAPIsStandardTest, DefineMaterializingSymbol) {
bool ExpectNoMoreMaterialization = false;
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
DispatchOverride = [&](std::unique_ptr<Task> T) {
if (ExpectNoMoreMaterialization && isa<MaterializationTask>(*T))
ADD_FAILURE() << "Unexpected materialization";
T->run();
});
};
auto MU = std::make_unique<SimpleMaterializationUnit>(
SymbolFlagsMap({{Foo, FooSym.getFlags()}}),
@@ -1403,7 +1403,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
std::mutex WorkThreadsMutex;
std::vector<std::thread> WorkThreads;
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
DispatchOverride = [&](std::unique_ptr<Task> T) {
std::promise<void> WaitP;
std::lock_guard<std::mutex> Lock(WorkThreadsMutex);
WorkThreads.push_back(
@@ -1412,7 +1412,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
T->run();
}));
WaitP.set_value();
});
};
cantFail(JD.define(absoluteSymbols({{Foo, FooSym}})));

View File

@@ -22,3 +22,18 @@ ModuleBuilder::ModuleBuilder(LLVMContext &Context, StringRef Triple,
if (Triple != "")
M->setTargetTriple(Triple);
}
void llvm::orc::CoreAPIsBasedStandardTest::OverridableDispatcher::dispatch(
std::unique_ptr<Task> T) {
if (Parent.DispatchOverride)
Parent.DispatchOverride(std::move(T));
else
InPlaceTaskDispatcher::dispatch(std::move(T));
}
std::unique_ptr<llvm::orc::ExecutorProcessControl>
llvm::orc::CoreAPIsBasedStandardTest::makeEPC(
std::shared_ptr<SymbolStringPool> SSP) {
return std::make_unique<UnsupportedExecutorProcessControl>(
std::move(SSP), std::make_unique<OverridableDispatcher>(*this));
}

View File

@@ -52,8 +52,20 @@ public:
}
protected:
class OverridableDispatcher : public InPlaceTaskDispatcher {
public:
OverridableDispatcher(CoreAPIsBasedStandardTest &Parent) : Parent(Parent) {}
void dispatch(std::unique_ptr<Task> T) override;
private:
CoreAPIsBasedStandardTest &Parent;
};
std::unique_ptr<llvm::orc::ExecutorProcessControl>
makeEPC(std::shared_ptr<SymbolStringPool> SSP);
std::shared_ptr<SymbolStringPool> SSP = std::make_shared<SymbolStringPool>();
ExecutionSession ES{std::make_unique<UnsupportedExecutorProcessControl>(SSP)};
ExecutionSession ES{makeEPC(SSP)};
JITDylib &JD = ES.createBareJITDylib("JD");
SymbolStringPtr Foo = ES.intern("foo");
SymbolStringPtr Bar = ES.intern("bar");
@@ -67,6 +79,7 @@ protected:
ExecutorSymbolDef BarSym{BarAddr, JITSymbolFlags::Exported};
ExecutorSymbolDef BazSym{BazAddr, JITSymbolFlags::Exported};
ExecutorSymbolDef QuxSym{QuxAddr, JITSymbolFlags::Exported};
unique_function<void(std::unique_ptr<Task>)> DispatchOverride;
};
} // end namespace orc

View File

@@ -24,7 +24,7 @@ TEST(InPlaceTaskDispatchTest, GenericNamedTask) {
#if LLVM_ENABLE_THREADS
TEST(DynamicThreadPoolDispatchTest, GenericNamedTask) {
auto D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
auto D = std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt);
std::promise<bool> P;
auto F = P.get_future();
D->dispatch(makeGenericNamedTask(