#pragma once #include "uv.h" #include "llvm/ADT/SmallVector.h" #include "llvm/ADT/FunctionExtras.h" #include "llvm/Support/raw_ostream.h" #include "Server/Logger.h" #include "Support/JSON.h" #include #include #include #include #include #include #include #include #include namespace clice::async { template struct promise; extern uv_loop_t* loop; #define uv_check_call(func, ...) \ if(int error = func(__VA_ARGS__); error < 0) { \ log::fatal("An error occurred in " #func ": {0}", uv_strerror(error)); \ } template T& uv_cast(U* u) { assert(u && u->data && "uv_cast: invalid uv handle"); return *static_cast(u->data); } using Callback = llvm::unique_function(json::Value)>; void start_server(Callback callback); void start_server(Callback callback, const char* ip, unsigned int port); void write(json::Value id, json::Value result); template struct result { std::optional value; result() {} ~result() {} bool await_ready() noexcept { return false; } decltype(auto) await_resume() noexcept { assert(value.has_value() && "await_resume: no value"); return std::move(*value); } template void return_value(T&& val) noexcept { assert(!value.has_value() && "return_value: value already set"); value.emplace(std::forward(val)); } }; template <> struct result { bool await_ready() noexcept { return false; } void await_resume() noexcept {} void return_void() noexcept {} }; /// Schedule a coroutine to run in the event loop. inline void schedule(std::coroutine_handle<> handle) { assert(handle && "schedule: invalid coroutine handle"); uv_async_t* async = new uv_async_t(); async->data = handle.address(); auto callback = [](uv_async_t* async) { auto handle = std::coroutine_handle<>::from_address(async->data); handle.resume(); uv_close(reinterpret_cast(async), nullptr); delete async; }; uv_check_call(uv_async_init, loop, async, callback); uv_check_call(uv_async_send, async); } template void schedule(const promise& promise) { schedule(promise.handle()); } template > struct task_awaiter : result { std::remove_cvref_t task; uv_work_t work; std::coroutine_handle<> caller; void await_suspend(std::coroutine_handle<> caller) noexcept { static_assert(!std::is_reference_v, "return type must not be a reference"); this->caller = caller; work.data = this; auto work_cb = [](uv_work_t* work) { auto& awaiter = uv_cast(work); if constexpr(!std::is_void_v) { awaiter.value.emplace(awaiter.task()); } else { awaiter.task(); } }; auto after_work_cb = [](uv_work_t* work, int status) { auto& awaiter = uv_cast(work); awaiter.caller.resume(); }; uv_check_call(uv_queue_work, loop, &work, work_cb, after_work_cb); } }; /// Schedule a task to run in the thread pool. template task_awaiter schedule_task(Task&& task) { return {{}, std::forward(task)}; } struct sleep_awaiter { std::chrono::milliseconds ms; std::coroutine_handle<> caller; bool await_ready() noexcept { return ms.count() == 0; } void await_suspend(std::coroutine_handle<> caller) noexcept { this->caller = caller; uv_timer_t* timer = new uv_timer_t{.data = this}; uv_check_call(uv_timer_init, loop, timer); auto callback = [](uv_timer_t* timer) { auto& awaiter = uv_cast(timer); awaiter.caller.resume(); uv_close((uv_handle_t*)timer, [](uv_handle_t* handle) { delete handle; }); }; uv_check_call(uv_timer_start, timer, callback, ms.count(), 0); } void await_resume() noexcept {} }; inline sleep_awaiter sleep(std::chrono::milliseconds ms) { return {ms}; } struct fs_awaiter { std::string path; std::coroutine_handle<> caller; std::chrono::system_clock::time_point modified; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> caller) noexcept { this->caller = caller; uv_fs_t* req = new uv_fs_t{.data = this}; auto callback = [](uv_fs_t* req) { auto& awaiter = uv_cast(req); if(req->result < 0) { log::fatal("An error occurred while opening file: {0}", uv_strerror(req->result)); awaiter.caller.resume(); delete req; return; } uv_timespec_t& mtime = req->statbuf.st_mtim; using namespace std::chrono; awaiter.modified = system_clock::time_point(seconds(mtime.tv_sec) + nanoseconds(mtime.tv_nsec)); awaiter.caller.resume(); delete req; }; uv_check_call(uv_fs_open, loop, req, path.c_str(), UV_FS_O_RDONLY, 0, callback); } decltype(auto) await_resume() noexcept { return modified; } }; inline fs_awaiter modified_time(llvm::StringRef path) { return {path.str()}; } template int run(Ps&&... ps) { (schedule(std::forward(ps)), ...); return uv_run(loop, UV_RUN_DEFAULT); } template struct awaiter { using coroutine_handle = typename promise::coroutine_handle; coroutine_handle h; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { h.promise().caller = handle; async::schedule(h); } decltype(auto) await_resume() noexcept { if constexpr(!std::is_void_v) { auto value = std::move(*h.promise().value); h.destroy(); return value; } else { h.destroy(); } } }; template struct final_awaiter { std::coroutine_handle<> caller; bool await_ready() noexcept { return false; } template void await_suspend(std::coroutine_handle self) noexcept { /// If this coroutine is a top-level coroutine, its caller is empty. if(!caller) { return; } /// Schedule the caller to run in the event loop. async::schedule(caller); } void await_resume() noexcept {} }; template struct promise_type : result { std::coroutine_handle<> caller; auto get_return_object() { return promise{std::coroutine_handle::from_promise(*this)}; } void unhandled_exception() { std::abort(); } std::suspend_always initial_suspend() { return {}; } auto final_suspend() noexcept { return final_awaiter{caller}; } }; template class promise { public: using promise_type = async::promise_type; using coroutine_handle = std::coroutine_handle; promise(coroutine_handle handle) : h(handle) {} promise(const promise&) = delete; promise(promise&& other) : h(other.h) { other.h = nullptr; } awaiter operator co_await() const noexcept { return {h}; } coroutine_handle handle() const noexcept { return h; } bool done() const noexcept { return h.done(); } void destroy() noexcept { assert(h && h.done() && "destroy: invalid coroutine handle"); h.destroy(); } private: coroutine_handle h; }; /// Suspend current coroutine and invoke the callback with its handle. /// Note the callback invoked before the coroutine is suspended. So it is /// template auto suspend(Callback&& callback) { struct suspend_awaiter { Callback callback; bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { callback(handle); } void await_resume() noexcept {} }; return suspend_awaiter{std::forward(callback)}; } } // namespace clice::async