## Summary ### Preamble completeness check - `is_preamble_complete()` in `scan.cpp`: checks whether `#include`/`import`/`export module` directives in the preamble region are syntactically complete (have closing `>`/`"`/`;`) - `ensure_pch` defers PCH rebuild when preamble is incomplete (user still typing), reuses old PCH instead of failing ### #include / import completion - Master intercepts completion requests in `#include "..."` / `#include <...>` / `import ...` contexts before forwarding to worker - `complete_include()`: searches include paths (from compile args via `SearchConfig`) using `DirListingCache`, supports quoted/angled/multi-level paths - `complete_import()`: filters `path_to_module` map by prefix - Word boundary checks prevent false matches (e.g. `important` not treated as `import`) ### Detached compile task (rapid-edit fix) - Compile operations (`ensure_deps` + `send_stateful` + `publish_diagnostics`) run as detached tasks via `loop.schedule()`, independent of the LSP request coroutine chain - LSP `$/cancelRequest` can no longer kill in-flight compilations — previously, cancellation would destroy the `ensure_compiled` coroutine frame, leaving `doc.compiling` permanently set and hanging all subsequent requests - `CompileGuard` RAII ensures `doc.compiling` is always cleaned up even if the detached task fails - Stale feature requests (where `ast_dirty` became true after compile finished) are dropped before forwarding to worker ### Other fixes - `signal(SIGPIPE, SIG_IGN)` on POSIX: prevents server crash when LSP client disconnects mid-write - `CompilationUnitRef::file_path()` / `deps()`: null-check `FileEntryRef` to prevent segfault on invalid FileID - `stateless_worker.cpp`: log BuildPCH diagnostic errors for debuggability - Default worker counts changed to 2 stateful + 3 stateless - `logging_dir` default changed to `.clice/logs` in config ### Tests - 19 unit tests for `is_preamble_complete` (incomplete `#include`, `import`, `export module`, mixed cases) - Integration tests: `test_include_completion.py` (5 tests), `test_import_completion.py` (4 tests), `test_rapid_edit.py` (2 tests), `test_pch.py` (4 new tests) - Smoke test: `rapid_edit.jsonl` — recorded VSCode session with 40 rapid edits + 61 cancel requests ## Test plan - [x] Unit tests: 463 passed - [x] Integration tests: 104 passed - [x] Smoke test (rapid_edit.jsonl): PASS - [x] Manual VSCode testing with `#include <iostream>` project 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
236 lines
6.7 KiB
C++
236 lines
6.7 KiB
C++
#include "server/worker_pool.h"
|
|
|
|
#include <csignal>
|
|
#include <string>
|
|
|
|
#include "eventide/ipc/transport.h"
|
|
#include "support/logging.h"
|
|
|
|
namespace clice {
|
|
|
|
namespace {
|
|
|
|
/// Coroutine that drains a worker's stderr pipe.
|
|
/// Workers write their own log files, so this only captures unexpected output
|
|
/// (crash stacktraces, assertion failures, etc.) that bypasses spdlog.
|
|
et::task<> drain_stderr(et::pipe stderr_pipe, std::string prefix) {
|
|
std::string buffer;
|
|
while(true) {
|
|
auto result = co_await stderr_pipe.read();
|
|
if(!result.has_value()) {
|
|
break;
|
|
}
|
|
auto& chunk = result.value();
|
|
if(chunk.empty())
|
|
break;
|
|
|
|
buffer += chunk;
|
|
|
|
std::size_t pos = 0;
|
|
while(true) {
|
|
auto nl = buffer.find('\n', pos);
|
|
if(nl == std::string::npos)
|
|
break;
|
|
auto line = buffer.substr(pos, nl - pos);
|
|
if(!line.empty()) {
|
|
LOG_DEBUG("{} {}", prefix, line);
|
|
}
|
|
pos = nl + 1;
|
|
}
|
|
buffer.erase(0, pos);
|
|
}
|
|
|
|
if(!buffer.empty()) {
|
|
LOG_DEBUG("{} {}", prefix, buffer);
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
bool WorkerPool::spawn_worker(const std::string& self_path,
|
|
bool stateful,
|
|
std::uint64_t memory_limit) {
|
|
auto& workers = stateful ? stateful_workers : stateless_workers;
|
|
auto worker_index = workers.size();
|
|
std::string worker_name = std::string(stateful ? "SF-" : "SL-") + std::to_string(worker_index);
|
|
|
|
et::process::options opts;
|
|
opts.file = self_path;
|
|
if(stateful) {
|
|
opts.args = {self_path,
|
|
"--mode",
|
|
"stateful-worker",
|
|
"--worker-memory-limit",
|
|
std::to_string(memory_limit)};
|
|
} else {
|
|
opts.args = {self_path, "--mode", "stateless-worker"};
|
|
}
|
|
|
|
opts.args.push_back("--worker-name");
|
|
opts.args.push_back(worker_name);
|
|
|
|
if(!log_dir_.empty()) {
|
|
opts.args.push_back("--log-dir");
|
|
opts.args.push_back(log_dir_);
|
|
}
|
|
|
|
opts.streams = {
|
|
et::process::stdio::pipe(true, false), // stdin: child reads
|
|
et::process::stdio::pipe(false, true), // stdout: child writes
|
|
et::process::stdio::pipe(false, true), // stderr: child writes
|
|
};
|
|
|
|
auto result = et::process::spawn(opts, loop);
|
|
if(!result) {
|
|
LOG_ERROR("Failed to spawn {} worker: {}",
|
|
stateful ? "stateful" : "stateless",
|
|
result.error().message());
|
|
return false;
|
|
}
|
|
|
|
auto& spawn = *result;
|
|
|
|
// StreamTransport: input = child's stdout (parent reads), output = child's stdin (parent
|
|
// writes)
|
|
auto transport = std::make_unique<et::ipc::StreamTransport>(std::move(spawn.stdout_pipe),
|
|
std::move(spawn.stdin_pipe));
|
|
auto peer = std::make_unique<et::ipc::BincodePeer>(loop, std::move(transport));
|
|
|
|
// Schedule stderr log collection
|
|
std::string prefix = "[" + worker_name + "]";
|
|
loop.schedule(drain_stderr(std::move(spawn.stderr_pipe), prefix));
|
|
|
|
workers.push_back(WorkerProcess{
|
|
.proc = std::move(spawn.proc),
|
|
.peer = std::move(peer),
|
|
.owned_documents = 0,
|
|
});
|
|
|
|
auto& w = workers.back();
|
|
loop.schedule(w.peer->run());
|
|
|
|
return true;
|
|
}
|
|
|
|
bool WorkerPool::start(const WorkerPoolOptions& options) {
|
|
log_dir_ = options.log_dir;
|
|
|
|
for(std::uint32_t i = 0; i < options.stateless_count; ++i) {
|
|
if(!spawn_worker(options.self_path, false, 0)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
for(std::uint32_t i = 0; i < options.stateful_count; ++i) {
|
|
if(!spawn_worker(options.self_path, true, options.worker_memory_limit)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Register evicted notification handler for each stateful worker
|
|
for(std::size_t i = 0; i < stateful_workers.size(); ++i) {
|
|
stateful_workers[i].peer->on_notification([this](const worker::EvictedParams& params) {
|
|
if(on_evicted) {
|
|
on_evicted(params.path);
|
|
}
|
|
});
|
|
}
|
|
|
|
LOG_INFO("WorkerPool started: {} stateless, {} stateful workers",
|
|
stateless_workers.size(),
|
|
stateful_workers.size());
|
|
return true;
|
|
}
|
|
|
|
et::task<> WorkerPool::stop() {
|
|
LOG_INFO("WorkerPool stopping...");
|
|
|
|
// Close output pipes to signal workers to exit gracefully
|
|
for(auto& w: stateless_workers) {
|
|
w.peer->close_output();
|
|
}
|
|
for(auto& w: stateful_workers) {
|
|
w.peer->close_output();
|
|
}
|
|
|
|
// Send SIGTERM to all workers
|
|
for(auto& w: stateless_workers) {
|
|
w.proc.kill(SIGTERM);
|
|
}
|
|
for(auto& w: stateful_workers) {
|
|
w.proc.kill(SIGTERM);
|
|
}
|
|
|
|
// Wait for all worker processes to exit
|
|
for(auto& w: stateless_workers) {
|
|
co_await w.proc.wait();
|
|
}
|
|
for(auto& w: stateful_workers) {
|
|
co_await w.proc.wait();
|
|
}
|
|
|
|
LOG_INFO("WorkerPool stopped");
|
|
}
|
|
|
|
std::size_t WorkerPool::assign_worker(std::uint32_t path_id) {
|
|
auto it = owner.find(path_id);
|
|
if(it != owner.end()) {
|
|
// Already assigned; touch LRU
|
|
auto lru_it = owner_lru_index.find(path_id);
|
|
if(lru_it != owner_lru_index.end()) {
|
|
owner_lru.erase(lru_it->second);
|
|
}
|
|
owner_lru.push_front(path_id);
|
|
owner_lru_index[path_id] = owner_lru.begin();
|
|
return it->second;
|
|
}
|
|
|
|
// New assignment: pick the least-loaded worker
|
|
auto selected = pick_least_loaded();
|
|
owner[path_id] = selected;
|
|
stateful_workers[selected].owned_documents++;
|
|
owner_lru.push_front(path_id);
|
|
owner_lru_index[path_id] = owner_lru.begin();
|
|
return selected;
|
|
}
|
|
|
|
std::size_t WorkerPool::pick_least_loaded() {
|
|
std::size_t best = 0;
|
|
for(std::size_t i = 1; i < stateful_workers.size(); ++i) {
|
|
if(stateful_workers[i].owned_documents < stateful_workers[best].owned_documents) {
|
|
best = i;
|
|
}
|
|
}
|
|
return best;
|
|
}
|
|
|
|
void WorkerPool::remove_owner(std::uint32_t path_id) {
|
|
auto it = owner.find(path_id);
|
|
if(it == owner.end())
|
|
return;
|
|
|
|
auto worker_idx = it->second;
|
|
stateful_workers[worker_idx].owned_documents--;
|
|
owner.erase(it);
|
|
|
|
auto lru_it = owner_lru_index.find(path_id);
|
|
if(lru_it != owner_lru_index.end()) {
|
|
owner_lru.erase(lru_it->second);
|
|
owner_lru_index.erase(lru_it);
|
|
}
|
|
}
|
|
|
|
void WorkerPool::clear_owner(std::size_t worker_index) {
|
|
llvm::SmallVector<std::uint32_t> to_remove;
|
|
for(auto& [pid, widx]: owner) {
|
|
if(widx == worker_index) {
|
|
to_remove.push_back(pid);
|
|
}
|
|
}
|
|
for(auto pid: to_remove) {
|
|
remove_owner(pid);
|
|
}
|
|
}
|
|
|
|
} // namespace clice
|