Some update.

This commit is contained in:
ykiko
2024-11-19 21:43:49 +08:00
parent cd1dfca2f5
commit e1c9c70f54
17 changed files with 894 additions and 211 deletions

View File

@@ -6,115 +6,202 @@
namespace clice {
static uv_loop_t* loop;
static uv_idle_t idle;
static uv_stream_t* writer;
namespace {
void schedule() {}
class MessageBuffer {
public:
MessageBuffer() = default;
void on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
static llvm::SmallString<4096> buffer;
buffer.resize_for_overwrite(suggested_size);
buf->base = buffer.data();
buf->len = suggested_size;
}
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
if(nread > 0) {
llvm::outs() << "Received from stdin: " << llvm::StringRef(buf->base, nread);
} else if(nread < 0) {
if(nread != UV_EOF) {
fprintf(stderr, "Error reading from stdin: %s\n", uv_strerror(nread));
}
uv_close((uv_handle_t*)stream, NULL);
void append(llvm::StringRef message) {
buffer += message;
}
}
void on_write(uv_write_t* req, int status) {
if(status < 0) {
fprintf(stderr, "Write error: %s\n", uv_strerror(status));
} else {
printf("Write completed successfully.\n");
llvm::StringRef peek() {
llvm::StringRef str = buffer;
std::size_t length = 0;
if(str.consume_front("Content-Length: ") && !str.consumeInteger(10, length) &&
str.consume_front("\r\n\r\n") && str.size() >= length) {
auto result = str.substr(0, length);
pos = str.end() - buffer.begin();
return result;
}
return {};
}
free(req);
void consume() {
buffer.erase(buffer.begin(), buffer.begin() + pos);
pos = 0;
}
private:
std::size_t pos;
llvm::SmallString<4096> buffer;
};
} // namespace
void Server::run(llvm::unique_function<void()> callback) {
this->callback = std::move(callback);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
void write(llvm::StringRef message) {
/// FIXME:
static uv_write_t write_req;
uv_buf_t buf = uv_buf_init((char*)message.data(), message.size());
uv_write(&write_req, writer, &buf, 1, on_write);
}
Server::Server(const config::ServerOption& option) {
uv_stream_t* init_socket(const char* address, unsigned int port) {
static uv_tcp_t server;
static uv_tcp_t client;
static uv_loop_t* loop = uv_default_loop();
static uv_idle_t idle;
uv_tcp_init(loop, &server);
uv_tcp_init(loop, &client);
struct sockaddr_in addr;
uv_ip4_addr(address, port, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*)&server, 128, [](uv_stream_t* server, int status) {
if(status < 0) {
fprintf(stderr, "New connection error\n");
return;
}
if(uv_accept(server, (uv_stream_t*)&client) == 0) {
printf("Client connected.\n");
uv_read_start((uv_stream_t*)&client, on_alloc_buffer, on_read);
} else {
uv_close((uv_handle_t*)&client, NULL);
}
/// start the idle loop.
uv_idle_init(loop, &idle);
idle.data = this;
uv_idle_start(&idle, [](uv_idle_t* handle) {
auto& server = *static_cast<Server*>(handle->data);
server.callback();
});
if(r) {
fprintf(stderr, "Listen error: %s\n", uv_strerror(r));
}
static auto on_alloc = +[](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
static llvm::SmallString<4096> buffer;
buffer.resize_for_overwrite(suggested_size);
buf->base = buffer.data();
buf->len = suggested_size;
};
return (uv_stream_t*)&client;
}
static auto on_read = [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
if(nread > 0) {
static MessageBuffer messageBuffer;
messageBuffer.append({buf->base, static_cast<std::size_t>(nread)});
if(auto message = messageBuffer.peek(); !message.empty()) {
auto& server = *static_cast<Server*>(stream->data);
if(auto json = json::parse(message)) {
server.messages.emplace_back(*json);
messageBuffer.consume();
} else {
llvm::errs() << "JSON PARSE ERROR " << json.takeError() << "\n";
}
}
} else if(nread < 0) {
if(nread != UV_EOF) {
fprintf(stderr, "Error reading from stdin: %s\n", uv_strerror(nread));
}
uv_close((uv_handle_t*)stream, NULL);
}
};
uv_stream_t* init_pipe() {
static uv_pipe_t stdin_pipe;
static uv_pipe_t stdout_pipe;
uv_pipe_init(loop, &stdin_pipe, 0);
uv_pipe_init(loop, &stdout_pipe, 0);
uv_pipe_open(&stdin_pipe, 0);
uv_pipe_open(&stdout_pipe, 1);
uv_read_start((uv_stream_t*)&stdin_pipe, on_alloc_buffer, on_read);
return (uv_stream_t*)&stdout_pipe;
}
int run(int argc, const char** argv) {
loop = uv_default_loop();
uv_idle_init(loop, &idle);
uv_idle_start(&idle, [](uv_idle_t* handle) { schedule(); });
/// read config file.
config::parse(argc, argv);
/// init writer.
const auto& option = config::server();
/// initialize the socket or pipe.
if(option.mode == "socket") {
writer = init_socket(option.address.c_str(), option.port);
static uv_tcp_t server;
static uv_tcp_t client;
uv_tcp_init(loop, &server);
uv_tcp_init(loop, &client);
server.data = this;
client.data = this;
struct sockaddr_in addr;
/// FIXME:
uv_ip4_addr(option.address.c_str(), option.port, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*)&server, 128, [](uv_stream_t* server, int status) {
if(status < 0) {
llvm::errs() << "New connection error\n";
return;
}
if(uv_accept(server, (uv_stream_t*)&client) == 0) {
llvm::errs() << "Client connected.\n";
uv_read_start((uv_stream_t*)&client, on_alloc, on_read);
} else {
uv_close((uv_handle_t*)&client, NULL);
}
});
if(r) {
llvm::errs() << "Listen error: " << uv_strerror(r) << "\n";
}
writer = (uv_stream_t*)&client;
} else if(option.mode == "pipe") {
writer = init_pipe();
static uv_pipe_t stdin_pipe;
static uv_pipe_t stdout_pipe;
uv_pipe_init(loop, &stdin_pipe, 0);
uv_pipe_init(loop, &stdout_pipe, 0);
stdin_pipe.data = this;
stdout_pipe.data = this;
uv_pipe_open(&stdin_pipe, 0);
uv_pipe_open(&stdout_pipe, 1);
uv_read_start((uv_stream_t*)&stdin_pipe, on_alloc, on_read);
writer = (uv_stream_t*)&stdout_pipe;
} else {
llvm::errs() << "Unknown mode: " << option.mode << "\n";
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
void Server::write(llvm::StringRef message) {
uv_write_t* req = new uv_write_t();
uv_buf_t buf = uv_buf_init(const_cast<char*>(message.data()), message.size());
auto on_write = [](uv_write_t* req, int status) {
if(status < 0) {
llvm::errs() << "Write error: " << uv_strerror(status) << "\n";
}
delete req;
};
uv_write(req, static_cast<uv_stream_t*>(writer), &buf, 1, on_write);
}
void Server::request() {}
void Server::response(json::Value id, json::Value result) {
json::Value response = json::Object{
{"jsonrpc", "2.0" },
{"id", id },
{"result", result},
};
struct Buffer {
llvm::SmallString<128> header;
llvm::SmallString<4096> message;
};
Buffer* buffer = new Buffer();
llvm::raw_svector_ostream os(buffer->message);
os << response;
llvm::raw_svector_ostream sos(buffer->header);
sos << "Content-Length: " << buffer->message.size() << "\r\n\r\n";
uv_buf_t bufs[2] = {
uv_buf_init(buffer->header.data(), buffer->header.size()),
uv_buf_init(buffer->message.data(), buffer->message.size()),
};
uv_write_t* req = new uv_write_t();
req->data = buffer;
auto on_write = [](uv_write_t* req, int status) {
if(status < 0) {
llvm::errs() << "Write error: " << uv_strerror(status) << "\n";
}
delete static_cast<Buffer*>(req->data);
delete req;
};
int r = uv_write(req, static_cast<uv_stream_t*>(writer), bufs, 2, on_write);
if(r) {
llvm::errs() << "Write error: " << uv_strerror(r) << "\n";
}
}
void Server::notify() {}
void Server::error() {}
} // namespace clice