diff --git a/src/server/service/agent_client.cpp b/src/server/service/agent_client.cpp index 69c7a81b..d51b610e 100644 --- a/src/server/service/agent_client.cpp +++ b/src/server/service/agent_client.cpp @@ -778,9 +778,10 @@ AgentClient::AgentClient(MasterServer& server, kota::ipc::JsonPeer& peer) : co_return result; }); - peer.on_notification([&srv](const ShutdownParams&) { + peer.on_notification([this, &srv](const ShutdownParams&) { LOG_INFO("agentic/shutdown received, shutting down"); srv.schedule_shutdown(); + this->peer.close(); }); } diff --git a/src/server/service/lsp_client.cpp b/src/server/service/lsp_client.cpp index 1607b60a..b7bbc369 100644 --- a/src/server/service/lsp_client.cpp +++ b/src/server/service/lsp_client.cpp @@ -156,6 +156,7 @@ LSPClient::LSPClient(MasterServer& server, kota::ipc::JsonPeer& peer) : server(s peer.on_notification([this]([[maybe_unused]] const protocol::ExitParams& params) { LOG_INFO("Exit notification received"); this->server.schedule_shutdown(); + this->peer.close(); }); peer.on_notification([this](const protocol::DidOpenTextDocumentParams& params) { diff --git a/src/server/service/master_server.cpp b/src/server/service/master_server.cpp index 9e8b9010..656a3245 100644 --- a/src/server/service/master_server.cpp +++ b/src/server/service/master_server.cpp @@ -212,10 +212,10 @@ void MasterServer::schedule_shutdown() { workspace.save_cache(); shutdown_event.set(); - loop.schedule([this]() -> kota::task<> { - co_await kota::when_all(indexer.stop(), compiler.stop(), pool.stop()); - loop.stop(); - }()); + loop.schedule([](MasterServer& server) -> kota::task<> { + co_await kota::when_all(server.indexer.stop(), server.compiler.stop(), server.pool.stop()); + server.loop.stop(); + }(*this)); } void MasterServer::load_workspace() { @@ -355,35 +355,53 @@ static kota::task<> accept_connections(MasterServer& server, std::list& connections) { auto& loop = kota::event_loop::current(); kota::task_group<> connection_group(loop); - bool lsp_registered = false; - while(true) { - auto conn = co_await acceptor.accept(); - if(!conn.has_value()) - break; + co_await kota::when_all( + [](MasterServer& server, + kota::tcp::acceptor& acceptor, + bool register_lsp, + std::list& connections, + kota::task_group<>& connection_group) -> kota::task<> { + auto& loop = kota::event_loop::current(); + bool lsp_registered = false; - LOG_INFO("Client connected"); + while(true) { + auto conn = co_await acceptor.accept(); + if(!conn.has_value()) + break; - auto transport = std::make_unique(std::move(*conn)); - auto peer = std::make_unique(loop, std::move(transport)); + LOG_INFO("Client connected"); - std::unique_ptr lsp; - if(register_lsp && !lsp_registered) { - lsp = std::make_unique(server, *peer); - lsp_registered = true; - } - auto agent = std::make_unique(server, *peer); + auto transport = std::make_unique(std::move(*conn)); + auto peer = std::make_unique(loop, std::move(transport)); - auto* peer_ptr = peer.get(); - auto it = connections.emplace(connections.end(), - Connection{ - .peer = std::move(peer), - .lsp_client = std::move(lsp), - .agent_client = std::move(agent), - }); + std::unique_ptr lsp; + if(register_lsp && !lsp_registered) { + lsp = std::make_unique(server, *peer); + lsp_registered = true; + } + auto agent = std::make_unique(server, *peer); - connection_group.spawn(run_connection(peer_ptr, connections, it)); - } + auto* peer_ptr = peer.get(); + auto it = connections.emplace(connections.end(), + Connection{ + .peer = std::move(peer), + .lsp_client = std::move(lsp), + .agent_client = std::move(agent), + }); + + connection_group.spawn(run_connection(peer_ptr, connections, it)); + } + }(server, acceptor, register_lsp, connections, connection_group), + [](MasterServer& server, + kota::tcp::acceptor& acceptor, + std::list& connections) -> kota::task<> { + co_await server.get_shutdown_event().wait(); + acceptor.stop(); + for(auto& conn: connections) { + conn.peer->close(); + } + }(server, acceptor, connections)); co_await connection_group.join(); } @@ -411,6 +429,10 @@ int run_server_mode(const ServerOptions& opts) { kota::ipc::JsonPeer lsp_peer(loop, std::move(final_transport)); LSPClient lsp_client(server, lsp_peer); + loop.schedule([](MasterServer& server, kota::ipc::JsonPeer& peer) -> kota::task<> { + co_await server.get_shutdown_event().wait(); + peer.close(); + }(server, lsp_peer)); if(opts.port > 0) { auto acceptor = kota::tcp::listen(opts.host, opts.port, {}, loop); diff --git a/src/server/worker/worker_pool.cpp b/src/server/worker/worker_pool.cpp index 2253e1b8..07adc35d 100644 --- a/src/server/worker/worker_pool.cpp +++ b/src/server/worker/worker_pool.cpp @@ -116,6 +116,9 @@ bool WorkerPool::start(const WorkerPoolOptions& options) { options_ = options; log_dir_ = options.log_dir; + stateless_workers.reserve(options.stateless_count); + stateful_workers.reserve(options.stateful_count); + for(std::uint32_t i = 0; i < options.stateless_count; ++i) { if(!spawn_worker(options.self_path, false, 0)) { return false; @@ -229,10 +232,10 @@ void WorkerPool::clear_owner(std::size_t worker_index) { kota::task<> WorkerPool::monitor_worker(std::size_t index, bool stateful) { auto& workers = stateful ? stateful_workers : stateless_workers; - auto& w = workers[index]; auto name = std::string(stateful ? "SF-" : "SL-") + std::to_string(index); - auto result = co_await w.proc.wait(); + auto result = co_await workers[index].proc.wait(); + auto& w = workers[index]; w.alive = false; if(shutting_down_) diff --git a/tests/conftest.py b/tests/conftest.py index 05890cc1..ecca7a0a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -185,44 +185,94 @@ async def make_client(executable: Path, workspace: Path) -> CliceClient: return c +SANITIZER_MARKERS = ( + "AddressSanitizer", + "LeakSanitizer", + "MemorySanitizer", + "ThreadSanitizer", + "UndefinedBehaviorSanitizer", + "==ERROR:", + "runtime error:", +) + + +def _server_stderr_excerpt(stderr_text: str) -> str: + interesting = [ + line + for line in stderr_text.splitlines() + if "[warn]" in line + or "[error]" in line + or "Sanitizer" in line + or "==ERROR:" in line + or "runtime error:" in line + ] + return "\n".join(interesting[-80:]) + + +async def assert_server_exited_cleanly(server, timeout: float = 3.0) -> None: + failures: list[str] = [] + + if server is None: + return + + if server.returncode is None: + try: + await asyncio.wait_for(server.wait(), timeout=timeout) + except asyncio.TimeoutError: + server.kill() + await server.wait() + failures.append(f"server did not exit within {timeout:g}s after shutdown") + + print(f"[server] exit code: {server.returncode}", flush=True) + + stderr_text = "" + if server.stderr: + try: + stderr_data = await asyncio.wait_for(server.stderr.read(), timeout=2.0) + stderr_text = stderr_data.decode("utf-8", errors="replace") + except Exception as exc: + failures.append(f"failed to collect server stderr: {exc!r}") + + for line in _server_stderr_excerpt(stderr_text).splitlines(): + print(f"[server] {line}", flush=True) + + if server.returncode != 0: + failures.append(f"server exited with code {server.returncode}") + + if any(marker in stderr_text for marker in SANITIZER_MARKERS): + failures.append("server stderr contains sanitizer/runtime error output") + + if failures: + excerpt = _server_stderr_excerpt(stderr_text) + if excerpt: + failures.append("server stderr excerpt:\n" + excerpt) + pytest.fail("\n".join(failures)) + + async def _shutdown_client(c: CliceClient) -> None: """Gracefully shut down a client, force-kill if needed.""" + server = getattr(c, "_server", None) + try: await asyncio.wait_for(c.shutdown_async(None), timeout=3.0) except Exception: pass + try: c.exit(None) except Exception: pass - await asyncio.sleep(0.3) - if hasattr(c, "_server") and c._server is not None and c._server.returncode is None: - c._server.kill() - try: - server = getattr(c, "_server", None) - if server: - if server.returncode is not None: - print(f"[server] exit code: {server.returncode}", flush=True) - if server.stderr: - stderr_data = await asyncio.wait_for(server.stderr.read(), timeout=2.0) - if stderr_data: - for line in stderr_data.decode( - "utf-8", errors="replace" - ).splitlines(): - if "[warn]" in line or "[error]" in line or "Sanitizer" in line: - print(f"[server] {line}", flush=True) - except Exception: - pass - - try: - c._stop_event.set() - for task in c._async_tasks: - task.cancel() - await asyncio.sleep(0.1) - except Exception: - pass + await assert_server_exited_cleanly(server) + finally: + try: + c._stop_event.set() + for task in c._async_tasks: + task.cancel() + await asyncio.sleep(0.1) + except Exception: + pass shutdown_client = _shutdown_client # Public alias for multi-session tests diff --git a/tests/integration/agentic/test_agentic.py b/tests/integration/agentic/test_agentic.py index 549647a4..75ad0815 100644 --- a/tests/integration/agentic/test_agentic.py +++ b/tests/integration/agentic/test_agentic.py @@ -530,7 +530,7 @@ async def test_rpc_impact_analysis_unknown(indexed_agentic, workspace): async def test_shutdown_during_indexing(executable, tmp_path): """Shutdown during active background indexing must exit cleanly.""" from tests.integration.utils.client import CliceClient - from tests.conftest import _find_free_port + from tests.conftest import _find_free_port, assert_server_exited_cleanly workspace = tmp_path / "ws" workspace.mkdir() @@ -560,33 +560,38 @@ async def test_shutdown_during_indexing(executable, tmp_path): c = CliceClient() await c.start_io(*cmd) - init_options = { - "project": { - "cache_dir": str(workspace / ".clice"), - "idle_timeout_ms": 0, - } - } - await c.initialize(workspace, initialization_options=init_options) - - # Give indexing a moment to start, then send shutdown - await asyncio.sleep(0.5) - - rpc = AgenticRpcClient(host, port) - body = json.dumps({"jsonrpc": "2.0", "method": "agentic/shutdown", "params": {}}) - rpc.sock.sendall(f"Content-Length: {len(body)}\r\n\r\n{body}".encode()) - rpc.sock.settimeout(5) try: - rpc.sock.recv(4096) - except (socket.timeout, OSError): - pass - rpc.sock.close() + init_options = { + "project": { + "cache_dir": str(workspace / ".clice"), + "idle_timeout_ms": 0, + } + } + try: + await c.initialize(workspace, initialization_options=init_options) + except Exception: + if c._server.returncode is not None: + await assert_server_exited_cleanly(c._server, timeout=15.0) + raise - for _ in range(30): - if c._server.returncode is not None: - break + # Give indexing a moment to start, then send shutdown await asyncio.sleep(0.5) - assert c._server.returncode is not None, "Server did not exit after shutdown" - assert c._server.returncode >= 0, ( - f"Server crashed with signal {-c._server.returncode}" - ) + rpc = AgenticRpcClient(host, port) + body = json.dumps( + {"jsonrpc": "2.0", "method": "agentic/shutdown", "params": {}} + ) + rpc.sock.sendall(f"Content-Length: {len(body)}\r\n\r\n{body}".encode()) + rpc.sock.settimeout(5) + try: + rpc.sock.recv(4096) + except (socket.timeout, OSError): + pass + rpc.sock.close() + + await assert_server_exited_cleanly(c._server, timeout=15.0) + finally: + c._stop_event.set() + for task in c._async_tasks: + task.cancel() + await asyncio.sleep(0.1)