#pragma once #include "uv.h" #include "llvm/ADT/SmallVector.h" #include "llvm/ADT/FunctionExtras.h" #include "llvm/Support/raw_ostream.h" #include "Support/JSON.h" #include #include #include #include #include #include #include #include #include namespace clice::async { template struct promise; extern uv_loop_t* loop; template T& uv_cast(U* u) { assert(u && u->data && "uv_cast: invalid uv handle"); return *static_cast(u->data); } using Callback = llvm::unique_function(json::Value)>; void start_server(Callback callback); void start_server(Callback callback, const char* ip, unsigned int port); void write(json::Value id, json::Value result); template struct result { union { Value value; }; result() {} ~result() {} bool await_ready() noexcept { return false; } decltype(auto) await_resume() noexcept { return std::move(value); } template void return_value(T&& val) noexcept { new (&value) Value(std::forward(val)); } }; template <> struct result { bool await_ready() noexcept { return false; } void await_resume() noexcept {} void return_void() noexcept {} }; /// Schedule a coroutine to run in the event loop. inline void schedule(std::coroutine_handle<> handle) { assert(handle && "schedule: invalid coroutine handle"); uv_async_t* async = new uv_async_t(); async->data = handle.address(); uv_async_init(loop, async, [](uv_async_t* async) { auto handle = std::coroutine_handle<>::from_address(async->data); handle.resume(); uv_close((uv_handle_t*)async, [](uv_handle_t* handle) { delete handle; }); }); uv_async_send(async); } template void schedule(promise promise) { schedule(promise.handle()); } template > struct task_awaiter : result { std::remove_cvref_t task; std::coroutine_handle<> caller; void await_suspend(std::coroutine_handle<> caller) noexcept { static_assert(!std::is_reference_v, "return type must not be a reference"); this->caller = caller; uv_work_t* work = new uv_work_t{.data = this}; uv_queue_work( loop, work, [](uv_work_t* work) { auto& awaiter = uv_cast(work); if constexpr(!std::is_void_v) { new (&awaiter.value) Ret(awaiter.task()); } else { awaiter.task(); } }, [](uv_work_t* work, int status) { auto& awaiter = uv_cast(work); awaiter.caller.resume(); delete work; }); } }; /// Schedule a task to run in the thread pool. template task_awaiter schedule_task(Task&& task) { return {{}, std::forward(task)}; } struct sleep_awaiter { std::chrono::milliseconds ms; std::coroutine_handle<> caller; bool await_ready() noexcept { return ms.count() == 0; } void await_suspend(std::coroutine_handle<> caller) noexcept { this->caller = caller; uv_timer_t* timer = new uv_timer_t{.data = this}; uv_timer_init(loop, timer); uv_timer_start( timer, [](uv_timer_t* timer) { auto& awaiter = uv_cast(timer); awaiter.caller.resume(); uv_close((uv_handle_t*)timer, [](uv_handle_t* handle) { delete handle; }); }, ms.count(), 0); } void await_resume() noexcept {} }; inline sleep_awaiter sleep(std::chrono::milliseconds ms) { return {ms}; } struct fs_awaiter { std::string path; std::coroutine_handle<> caller; std::chrono::system_clock::time_point modified; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> caller) noexcept { this->caller = caller; uv_fs_t* req = new uv_fs_t{.data = this}; uv_fs_open(loop, req, path.c_str(), UV_FS_O_RDONLY, 0, [](uv_fs_t* req) { auto& awaiter = uv_cast(req); if(req->result < 0) { llvm::errs() << "Error: " << uv_strerror(req->result) << "\n"; awaiter.caller.resume(); delete req; return; } uv_fs_close(loop, req, req->result, [](uv_fs_t* req) { auto& awaiter = uv_cast(req); uv_timespec_t& mtime = req->statbuf.st_mtim; using namespace std::chrono; awaiter.modified = system_clock::time_point(seconds(mtime.tv_sec) + nanoseconds(mtime.tv_nsec)); awaiter.caller.resume(); delete req; }); }); } decltype(auto) await_resume() noexcept { return modified; } }; inline fs_awaiter modified_time(llvm::StringRef path) { return {path.str()}; } template int run(Ps&&... ps) { (schedule(std::forward(ps)), ...); return uv_run(loop, UV_RUN_DEFAULT); } template struct awaiter { using coroutine_handle = typename promise::coroutine_handle; coroutine_handle h; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { h.promise().caller = handle; async::schedule(h); } decltype(auto) await_resume() noexcept requires (!std::is_void_v) { return std::move(h.promise().value); } void await_resume() noexcept requires (std::is_void_v) {} }; template struct promise_type : result { std::coroutine_handle<> caller; auto get_return_object() { return promise{std::coroutine_handle::from_promise(*this)}; } void unhandled_exception() { std::abort(); } std::suspend_always initial_suspend() { return {}; } auto final_suspend() noexcept { struct FinalAwaiter { std::coroutine_handle<> caller; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> self) noexcept { self.destroy(); /// If this coroutine is a top-level coroutine, its caller is empty. if(!caller) { return; } /// Schedule the caller to run in the event loop. async::schedule(caller); } void await_resume() noexcept {} }; return FinalAwaiter{.caller = caller}; } }; template class promise { public: using promise_type = async::promise_type; using coroutine_handle = std::coroutine_handle; promise(coroutine_handle handle) : h(handle) {} promise(const promise&) = delete; promise(promise&& other) : h(other.h) { other.h = nullptr; } awaiter operator co_await() const noexcept { return {h}; } coroutine_handle handle() const noexcept { return h; } private: coroutine_handle h; }; } // namespace clice::async