Re-apply "[ORC] Unify task dispatch across ExecutionSession and..." with fix.
This re-applies6094b3b7db, which was reverted ina28557aadddue to broken bots. As far as I can tell all failures were due to a missing #include <deque>, which has been adedd in this commit.
This commit is contained in:
@@ -1443,9 +1443,6 @@ public:
|
||||
/// Send a result to the remote.
|
||||
using SendResultFunction = unique_function<void(shared::WrapperFunctionResult)>;
|
||||
|
||||
/// For dispatching ORC tasks (typically materialization tasks).
|
||||
using DispatchTaskFunction = unique_function<void(std::unique_ptr<Task> T)>;
|
||||
|
||||
/// An asynchronous wrapper-function callable from the executor via
|
||||
/// jit-dispatch.
|
||||
using JITDispatchHandlerFunction = unique_function<void(
|
||||
@@ -1568,12 +1565,6 @@ public:
|
||||
/// Unhandled errors can be sent here to log them.
|
||||
void reportError(Error Err) { ReportError(std::move(Err)); }
|
||||
|
||||
/// Set the task dispatch function.
|
||||
ExecutionSession &setDispatchTask(DispatchTaskFunction DispatchTask) {
|
||||
this->DispatchTask = std::move(DispatchTask);
|
||||
return *this;
|
||||
}
|
||||
|
||||
/// Search the given JITDylibs to find the flags associated with each of the
|
||||
/// given symbols.
|
||||
void lookupFlags(LookupKind K, JITDylibSearchOrder SearchOrder,
|
||||
@@ -1648,7 +1639,7 @@ public:
|
||||
void dispatchTask(std::unique_ptr<Task> T) {
|
||||
assert(T && "T must be non-null");
|
||||
DEBUG_WITH_TYPE("orc", dumpDispatchInfo(*T));
|
||||
DispatchTask(std::move(T));
|
||||
EPC->getDispatcher().dispatch(std::move(T));
|
||||
}
|
||||
|
||||
/// Run a wrapper function in the executor.
|
||||
@@ -1762,8 +1753,6 @@ private:
|
||||
logAllUnhandledErrors(std::move(Err), errs(), "JIT session error: ");
|
||||
}
|
||||
|
||||
static void runOnCurrentThread(std::unique_ptr<Task> T) { T->run(); }
|
||||
|
||||
void dispatchOutstandingMUs();
|
||||
|
||||
static std::unique_ptr<MaterializationResponsibility>
|
||||
@@ -1869,7 +1858,6 @@ private:
|
||||
std::unique_ptr<ExecutorProcessControl> EPC;
|
||||
std::unique_ptr<Platform> P;
|
||||
ErrorReporter ReportError = logErrorsToStdErr;
|
||||
DispatchTaskFunction DispatchTask = runOnCurrentThread;
|
||||
|
||||
std::vector<ResourceManager *> ResourceManagers;
|
||||
|
||||
|
||||
@@ -254,7 +254,6 @@ protected:
|
||||
|
||||
DataLayout DL;
|
||||
Triple TT;
|
||||
std::unique_ptr<DefaultThreadPool> CompileThreads;
|
||||
|
||||
std::unique_ptr<ObjectLayer> ObjLinkingLayer;
|
||||
std::unique_ptr<ObjectTransformLayer> ObjTransformLayer;
|
||||
@@ -325,6 +324,7 @@ public:
|
||||
PlatformSetupFunction SetUpPlatform;
|
||||
NotifyCreatedFunction NotifyCreated;
|
||||
unsigned NumCompileThreads = 0;
|
||||
std::optional<bool> SupportConcurrentCompilation;
|
||||
|
||||
/// Called prior to JIT class construcion to fix up defaults.
|
||||
Error prepareForConstruction();
|
||||
@@ -333,7 +333,7 @@ public:
|
||||
template <typename JITType, typename SetterImpl, typename State>
|
||||
class LLJITBuilderSetters {
|
||||
public:
|
||||
/// Set a ExecutorProcessControl for this instance.
|
||||
/// Set an ExecutorProcessControl for this instance.
|
||||
/// This should not be called if ExecutionSession has already been set.
|
||||
SetterImpl &
|
||||
setExecutorProcessControl(std::unique_ptr<ExecutorProcessControl> EPC) {
|
||||
@@ -462,19 +462,26 @@ public:
|
||||
///
|
||||
/// If this method is not called, behavior will be as if it were called with
|
||||
/// a zero argument.
|
||||
///
|
||||
/// This setting should not be used if a custom ExecutionSession or
|
||||
/// ExecutorProcessControl object is set: in those cases a custom
|
||||
/// TaskDispatcher should be used instead.
|
||||
SetterImpl &setNumCompileThreads(unsigned NumCompileThreads) {
|
||||
impl().NumCompileThreads = NumCompileThreads;
|
||||
return impl();
|
||||
}
|
||||
|
||||
/// Set an ExecutorProcessControl object.
|
||||
/// If set, this forces LLJIT concurrent compilation support to be either on
|
||||
/// or off. This controls the selection of compile function (concurrent vs
|
||||
/// single threaded) and whether or not sub-modules are cloned to new
|
||||
/// contexts for lazy emission.
|
||||
///
|
||||
/// If the platform uses ObjectLinkingLayer by default and no
|
||||
/// ObjectLinkingLayerCreator has been set then the ExecutorProcessControl
|
||||
/// object will be used to supply the memory manager for the
|
||||
/// ObjectLinkingLayer.
|
||||
SetterImpl &setExecutorProcessControl(ExecutorProcessControl &EPC) {
|
||||
impl().EPC = &EPC;
|
||||
/// If not explicitly set then concurrency support will be turned on if
|
||||
/// NumCompileThreads is set to a non-zero value, or if a custom
|
||||
/// ExecutionSession or ExecutorProcessControl instance is provided.
|
||||
SetterImpl &setSupportConcurrentCompilation(
|
||||
std::optional<bool> SupportConcurrentCompilation) {
|
||||
impl().SupportConcurrentCompilation = SupportConcurrentCompilation;
|
||||
return impl();
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
#if LLVM_ENABLE_THREADS
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#endif
|
||||
@@ -114,6 +115,9 @@ public:
|
||||
|
||||
class DynamicThreadPoolTaskDispatcher : public TaskDispatcher {
|
||||
public:
|
||||
DynamicThreadPoolTaskDispatcher(
|
||||
std::optional<size_t> MaxMaterializationThreads)
|
||||
: MaxMaterializationThreads(MaxMaterializationThreads) {}
|
||||
void dispatch(std::unique_ptr<Task> T) override;
|
||||
void shutdown() override;
|
||||
private:
|
||||
@@ -121,6 +125,10 @@ private:
|
||||
bool Running = true;
|
||||
size_t Outstanding = 0;
|
||||
std::condition_variable OutstandingCV;
|
||||
|
||||
std::optional<size_t> MaxMaterializationThreads;
|
||||
size_t NumMaterializationThreads = 0;
|
||||
std::deque<std::unique_ptr<Task>> MaterializationTaskQueue;
|
||||
};
|
||||
|
||||
#endif // LLVM_ENABLE_THREADS
|
||||
|
||||
@@ -63,7 +63,7 @@ SelfExecutorProcessControl::Create(
|
||||
|
||||
if (!D) {
|
||||
#if LLVM_ENABLE_THREADS
|
||||
D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
|
||||
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt);
|
||||
#else
|
||||
D = std::make_unique<InPlaceTaskDispatcher>();
|
||||
#endif
|
||||
|
||||
@@ -667,6 +667,37 @@ Error LLJITBuilderState::prepareForConstruction() {
|
||||
return JTMBOrErr.takeError();
|
||||
}
|
||||
|
||||
if ((ES || EPC) && NumCompileThreads)
|
||||
return make_error<StringError>(
|
||||
"NumCompileThreads cannot be used with a custom ExecutionSession or "
|
||||
"ExecutorProcessControl",
|
||||
inconvertibleErrorCode());
|
||||
|
||||
#if !LLVM_ENABLE_THREADS
|
||||
if (NumCompileThreads)
|
||||
return make_error<StringError>(
|
||||
"LLJIT num-compile-threads is " + Twine(NumCompileThreads) +
|
||||
" but LLVM was compiled with LLVM_ENABLE_THREADS=Off",
|
||||
inconvertibleErrorCode());
|
||||
#endif // !LLVM_ENABLE_THREADS
|
||||
|
||||
bool ConcurrentCompilationSettingDefaulted = !SupportConcurrentCompilation;
|
||||
if (!SupportConcurrentCompilation) {
|
||||
#if LLVM_ENABLE_THREADS
|
||||
SupportConcurrentCompilation = NumCompileThreads || ES || EPC;
|
||||
#else
|
||||
SupportConcurrentCompilation = false;
|
||||
#endif // LLVM_ENABLE_THREADS
|
||||
} else {
|
||||
#if !LLVM_ENABLE_THREADS
|
||||
if (*SupportConcurrentCompilation)
|
||||
return make_error<StringError>(
|
||||
"LLJIT concurrent compilation support requested, but LLVM was built "
|
||||
"with LLVM_ENABLE_THREADS=Off",
|
||||
inconvertibleErrorCode());
|
||||
#endif // !LLVM_ENABLE_THREADS
|
||||
}
|
||||
|
||||
LLVM_DEBUG({
|
||||
dbgs() << " JITTargetMachineBuilder is "
|
||||
<< JITTargetMachineBuilderPrinter(*JTMB, " ")
|
||||
@@ -684,11 +715,13 @@ Error LLJITBuilderState::prepareForConstruction() {
|
||||
<< (CreateCompileFunction ? "Yes" : "No") << "\n"
|
||||
<< " Custom platform-setup function: "
|
||||
<< (SetUpPlatform ? "Yes" : "No") << "\n"
|
||||
<< " Number of compile threads: " << NumCompileThreads;
|
||||
if (!NumCompileThreads)
|
||||
dbgs() << " (code will be compiled on the execution thread)\n";
|
||||
<< " Support concurrent compilation: "
|
||||
<< (*SupportConcurrentCompilation ? "Yes" : "No");
|
||||
if (ConcurrentCompilationSettingDefaulted)
|
||||
dbgs() << " (defaulted based on ES / EPC)\n";
|
||||
else
|
||||
dbgs() << "\n";
|
||||
dbgs() << " Number of compile threads: " << NumCompileThreads << "\n";
|
||||
});
|
||||
|
||||
// Create DL if not specified.
|
||||
@@ -705,7 +738,19 @@ Error LLJITBuilderState::prepareForConstruction() {
|
||||
dbgs() << "ExecutorProcessControl not specified, "
|
||||
"Creating SelfExecutorProcessControl instance\n";
|
||||
});
|
||||
if (auto EPCOrErr = SelfExecutorProcessControl::Create())
|
||||
|
||||
std::unique_ptr<TaskDispatcher> D = nullptr;
|
||||
#if LLVM_ENABLE_THREADS
|
||||
if (*SupportConcurrentCompilation) {
|
||||
std::optional<size_t> NumThreads = std ::nullopt;
|
||||
if (NumCompileThreads)
|
||||
NumThreads = NumCompileThreads;
|
||||
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(NumThreads);
|
||||
} else
|
||||
D = std::make_unique<InPlaceTaskDispatcher>();
|
||||
#endif // LLVM_ENABLE_THREADS
|
||||
if (auto EPCOrErr =
|
||||
SelfExecutorProcessControl::Create(nullptr, std::move(D), nullptr))
|
||||
EPC = std::move(*EPCOrErr);
|
||||
else
|
||||
return EPCOrErr.takeError();
|
||||
@@ -790,8 +835,6 @@ Error LLJITBuilderState::prepareForConstruction() {
|
||||
}
|
||||
|
||||
LLJIT::~LLJIT() {
|
||||
if (CompileThreads)
|
||||
CompileThreads->wait();
|
||||
if (auto Err = ES->endSession())
|
||||
ES->reportError(std::move(Err));
|
||||
}
|
||||
@@ -916,9 +959,8 @@ LLJIT::createCompileFunction(LLJITBuilderState &S,
|
||||
if (S.CreateCompileFunction)
|
||||
return S.CreateCompileFunction(std::move(JTMB));
|
||||
|
||||
// Otherwise default to creating a SimpleCompiler, or ConcurrentIRCompiler,
|
||||
// depending on the number of threads requested.
|
||||
if (S.NumCompileThreads > 0)
|
||||
// If using a custom EPC then use a ConcurrentIRCompiler by default.
|
||||
if (*S.SupportConcurrentCompilation)
|
||||
return std::make_unique<ConcurrentIRCompiler>(std::move(JTMB));
|
||||
|
||||
auto TM = JTMB.createTargetMachine();
|
||||
@@ -970,21 +1012,8 @@ LLJIT::LLJIT(LLJITBuilderState &S, Error &Err)
|
||||
std::make_unique<IRTransformLayer>(*ES, *TransformLayer);
|
||||
}
|
||||
|
||||
if (S.NumCompileThreads > 0) {
|
||||
if (*S.SupportConcurrentCompilation)
|
||||
InitHelperTransformLayer->setCloneToNewContextOnEmit(true);
|
||||
CompileThreads = std::make_unique<DefaultThreadPool>(
|
||||
hardware_concurrency(S.NumCompileThreads));
|
||||
ES->setDispatchTask([this](std::unique_ptr<Task> T) {
|
||||
// FIXME: We should be able to use move-capture here, but ThreadPool's
|
||||
// AsyncTaskTys are std::functions rather than unique_functions
|
||||
// (because MSVC's std::packaged_tasks don't support move-only types).
|
||||
// Fix this when all the above gets sorted out.
|
||||
CompileThreads->async([UnownedT = T.release()]() mutable {
|
||||
std::unique_ptr<Task> T(UnownedT);
|
||||
T->run();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (S.SetupProcessSymbolsJITDylib) {
|
||||
if (auto ProcSymsJD = S.SetupProcessSymbolsJITDylib(*this)) {
|
||||
@@ -1240,7 +1269,7 @@ LLLazyJIT::LLLazyJIT(LLLazyJITBuilderState &S, Error &Err) : LLJIT(S, Err) {
|
||||
CODLayer = std::make_unique<CompileOnDemandLayer>(
|
||||
*ES, *InitHelperTransformLayer, *LCTMgr, std::move(ISMBuilder));
|
||||
|
||||
if (S.NumCompileThreads > 0)
|
||||
if (*S.SupportConcurrentCompilation)
|
||||
CODLayer->setCloneToNewContextOnEmit(true);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#include "llvm/ExecutionEngine/Orc/TaskDispatch.h"
|
||||
#include "llvm/ExecutionEngine/Orc/Core.h"
|
||||
|
||||
namespace llvm {
|
||||
namespace orc {
|
||||
@@ -24,16 +25,52 @@ void InPlaceTaskDispatcher::shutdown() {}
|
||||
|
||||
#if LLVM_ENABLE_THREADS
|
||||
void DynamicThreadPoolTaskDispatcher::dispatch(std::unique_ptr<Task> T) {
|
||||
bool IsMaterializationTask = isa<MaterializationTask>(*T);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> Lock(DispatchMutex);
|
||||
|
||||
if (IsMaterializationTask) {
|
||||
|
||||
// If this is a materialization task and there are too many running
|
||||
// already then queue this one up and return early.
|
||||
if (MaxMaterializationThreads &&
|
||||
NumMaterializationThreads == *MaxMaterializationThreads) {
|
||||
MaterializationTaskQueue.push_back(std::move(T));
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise record that we have a materialization task running.
|
||||
++NumMaterializationThreads;
|
||||
}
|
||||
|
||||
++Outstanding;
|
||||
}
|
||||
|
||||
std::thread([this, T = std::move(T)]() mutable {
|
||||
T->run();
|
||||
std::lock_guard<std::mutex> Lock(DispatchMutex);
|
||||
--Outstanding;
|
||||
OutstandingCV.notify_all();
|
||||
std::thread([this, T = std::move(T), IsMaterializationTask]() mutable {
|
||||
while (true) {
|
||||
|
||||
// Run the task.
|
||||
T->run();
|
||||
|
||||
std::lock_guard<std::mutex> Lock(DispatchMutex);
|
||||
if (!MaterializationTaskQueue.empty()) {
|
||||
// If there are any materialization tasks running then steal that work.
|
||||
T = std::move(MaterializationTaskQueue.front());
|
||||
MaterializationTaskQueue.pop_front();
|
||||
if (!IsMaterializationTask) {
|
||||
++NumMaterializationThreads;
|
||||
IsMaterializationTask = true;
|
||||
}
|
||||
} else {
|
||||
// Otherwise decrement work counters.
|
||||
if (IsMaterializationTask)
|
||||
--NumMaterializationThreads;
|
||||
--Outstanding;
|
||||
OutstandingCV.notify_all();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}).detach();
|
||||
}
|
||||
|
||||
|
||||
@@ -807,8 +807,8 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> launchExecutor() {
|
||||
S.CreateMemoryManager = createSharedMemoryManager;
|
||||
|
||||
return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
|
||||
std::make_unique<DynamicThreadPoolTaskDispatcher>(), std::move(S),
|
||||
FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
|
||||
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
|
||||
std::move(S), FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -897,7 +897,7 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> connectToExecutor() {
|
||||
S.CreateMemoryManager = createSharedMemoryManager;
|
||||
|
||||
return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
|
||||
std::make_unique<DynamicThreadPoolTaskDispatcher>(),
|
||||
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
|
||||
std::move(S), *SockFD, *SockFD);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -1005,11 +1005,11 @@ TEST_F(CoreAPIsStandardTest, RedefineBoundWeakSymbol) {
|
||||
|
||||
TEST_F(CoreAPIsStandardTest, DefineMaterializingSymbol) {
|
||||
bool ExpectNoMoreMaterialization = false;
|
||||
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
|
||||
DispatchOverride = [&](std::unique_ptr<Task> T) {
|
||||
if (ExpectNoMoreMaterialization && isa<MaterializationTask>(*T))
|
||||
ADD_FAILURE() << "Unexpected materialization";
|
||||
T->run();
|
||||
});
|
||||
};
|
||||
|
||||
auto MU = std::make_unique<SimpleMaterializationUnit>(
|
||||
SymbolFlagsMap({{Foo, FooSym.getFlags()}}),
|
||||
@@ -1403,7 +1403,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
|
||||
|
||||
std::mutex WorkThreadsMutex;
|
||||
std::vector<std::thread> WorkThreads;
|
||||
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
|
||||
DispatchOverride = [&](std::unique_ptr<Task> T) {
|
||||
std::promise<void> WaitP;
|
||||
std::lock_guard<std::mutex> Lock(WorkThreadsMutex);
|
||||
WorkThreads.push_back(
|
||||
@@ -1412,7 +1412,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
|
||||
T->run();
|
||||
}));
|
||||
WaitP.set_value();
|
||||
});
|
||||
};
|
||||
|
||||
cantFail(JD.define(absoluteSymbols({{Foo, FooSym}})));
|
||||
|
||||
|
||||
@@ -22,3 +22,18 @@ ModuleBuilder::ModuleBuilder(LLVMContext &Context, StringRef Triple,
|
||||
if (Triple != "")
|
||||
M->setTargetTriple(Triple);
|
||||
}
|
||||
|
||||
void llvm::orc::CoreAPIsBasedStandardTest::OverridableDispatcher::dispatch(
|
||||
std::unique_ptr<Task> T) {
|
||||
if (Parent.DispatchOverride)
|
||||
Parent.DispatchOverride(std::move(T));
|
||||
else
|
||||
InPlaceTaskDispatcher::dispatch(std::move(T));
|
||||
}
|
||||
|
||||
std::unique_ptr<llvm::orc::ExecutorProcessControl>
|
||||
llvm::orc::CoreAPIsBasedStandardTest::makeEPC(
|
||||
std::shared_ptr<SymbolStringPool> SSP) {
|
||||
return std::make_unique<UnsupportedExecutorProcessControl>(
|
||||
std::move(SSP), std::make_unique<OverridableDispatcher>(*this));
|
||||
}
|
||||
|
||||
@@ -52,8 +52,20 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
class OverridableDispatcher : public InPlaceTaskDispatcher {
|
||||
public:
|
||||
OverridableDispatcher(CoreAPIsBasedStandardTest &Parent) : Parent(Parent) {}
|
||||
void dispatch(std::unique_ptr<Task> T) override;
|
||||
|
||||
private:
|
||||
CoreAPIsBasedStandardTest &Parent;
|
||||
};
|
||||
|
||||
std::unique_ptr<llvm::orc::ExecutorProcessControl>
|
||||
makeEPC(std::shared_ptr<SymbolStringPool> SSP);
|
||||
|
||||
std::shared_ptr<SymbolStringPool> SSP = std::make_shared<SymbolStringPool>();
|
||||
ExecutionSession ES{std::make_unique<UnsupportedExecutorProcessControl>(SSP)};
|
||||
ExecutionSession ES{makeEPC(SSP)};
|
||||
JITDylib &JD = ES.createBareJITDylib("JD");
|
||||
SymbolStringPtr Foo = ES.intern("foo");
|
||||
SymbolStringPtr Bar = ES.intern("bar");
|
||||
@@ -67,6 +79,7 @@ protected:
|
||||
ExecutorSymbolDef BarSym{BarAddr, JITSymbolFlags::Exported};
|
||||
ExecutorSymbolDef BazSym{BazAddr, JITSymbolFlags::Exported};
|
||||
ExecutorSymbolDef QuxSym{QuxAddr, JITSymbolFlags::Exported};
|
||||
unique_function<void(std::unique_ptr<Task>)> DispatchOverride;
|
||||
};
|
||||
|
||||
} // end namespace orc
|
||||
|
||||
@@ -24,7 +24,7 @@ TEST(InPlaceTaskDispatchTest, GenericNamedTask) {
|
||||
|
||||
#if LLVM_ENABLE_THREADS
|
||||
TEST(DynamicThreadPoolDispatchTest, GenericNamedTask) {
|
||||
auto D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
|
||||
auto D = std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt);
|
||||
std::promise<bool> P;
|
||||
auto F = P.get_future();
|
||||
D->dispatch(makeGenericNamedTask(
|
||||
|
||||
Reference in New Issue
Block a user