[lldb] Adding pipe support to lldb_private::MainLoopWindows. (#145621)

This updates MainLoopWindows to support events for reading from a pipe
(both anonymous and named pipes) as well as sockets.

This unifies both handle types using `WSAWaitForMultipleEvents` which
can listen to both sockets and handles for change events.

This should allow us to unify how we handle watching pipes/sockets on
Windows and Posix systems.

We can extend this in the future if we want to support watching other
types, like files or even other events like a process life time.

---------

Co-authored-by: Pavel Labath <pavel@labath.sk>
This commit is contained in:
John Harrison
2025-07-01 08:59:27 -07:00
committed by GitHub
parent 47e28d9cd1
commit 1a7b7e24bc
10 changed files with 263 additions and 61 deletions

View File

@@ -274,7 +274,11 @@ int NativeFile::GetDescriptor() const {
}
IOObject::WaitableHandle NativeFile::GetWaitableHandle() {
#ifdef _WIN32
return (HANDLE)_get_osfhandle(GetDescriptor());
#else
return GetDescriptor();
#endif
}
FILE *NativeFile::GetStream() {

View File

@@ -42,7 +42,7 @@ ReadFull(IOObject &descriptor, size_t length,
if (timeout && timeout_supported) {
SelectHelper sh;
sh.SetTimeout(*timeout);
sh.FDSetRead(descriptor.GetWaitableHandle());
sh.FDSetRead((lldb::socket_t)descriptor.GetWaitableHandle());
Status status = sh.Select();
if (status.Fail()) {
// Convert timeouts into a specific error.

View File

@@ -313,8 +313,7 @@ Socket::DecodeHostAndPort(llvm::StringRef host_and_port) {
}
IOObject::WaitableHandle Socket::GetWaitableHandle() {
// TODO: On Windows, use WSAEventSelect
return m_socket;
return (IOObject::WaitableHandle)m_socket;
}
Status Socket::Read(void *buf, size_t &num_bytes) {

View File

@@ -276,7 +276,7 @@ size_t ConnectionFileDescriptor::Read(void *dst, size_t dst_len,
"%p ConnectionFileDescriptor::Read() fd = %" PRIu64
", dst = %p, dst_len = %" PRIu64 ") => %" PRIu64 ", error = %s",
static_cast<void *>(this),
static_cast<uint64_t>(m_io_sp->GetWaitableHandle()),
static_cast<file_t>(m_io_sp->GetWaitableHandle()),
static_cast<void *>(dst), static_cast<uint64_t>(dst_len),
static_cast<uint64_t>(bytes_read), error.AsCString());
}
@@ -380,7 +380,7 @@ size_t ConnectionFileDescriptor::Write(const void *src, size_t src_len,
"%p ConnectionFileDescriptor::Write(fd = %" PRIu64
", src = %p, src_len = %" PRIu64 ") => %" PRIu64 " (error = %s)",
static_cast<void *>(this),
static_cast<uint64_t>(m_io_sp->GetWaitableHandle()),
static_cast<file_t>(m_io_sp->GetWaitableHandle()),
static_cast<const void *>(src), static_cast<uint64_t>(src_len),
static_cast<uint64_t>(bytes_sent), error.AsCString());
}
@@ -451,7 +451,8 @@ ConnectionFileDescriptor::BytesAvailable(const Timeout<std::micro> &timeout,
if (timeout)
select_helper.SetTimeout(*timeout);
select_helper.FDSetRead(handle);
// FIXME: Migrate to MainLoop.
select_helper.FDSetRead((lldb::socket_t)handle);
#if defined(_WIN32)
// select() won't accept pipes on Windows. The entire Windows codepath
// needs to be converted over to using WaitForMultipleObjects and event
@@ -493,7 +494,7 @@ ConnectionFileDescriptor::BytesAvailable(const Timeout<std::micro> &timeout,
break; // Lets keep reading to until we timeout
}
} else {
if (select_helper.FDIsSetRead(handle))
if (select_helper.FDIsSetRead((lldb::socket_t)handle))
return eConnectionStatusSuccess;
if (select_helper.FDIsSetRead(pipe_fd)) {

View File

@@ -8,8 +8,11 @@
#include "lldb/Host/windows/MainLoopWindows.h"
#include "lldb/Host/Config.h"
#include "lldb/Host/Socket.h"
#include "lldb/Utility/Status.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Casting.h"
#include "llvm/Support/WindowsError.h"
#include <algorithm>
#include <cassert>
#include <cerrno>
@@ -31,6 +34,122 @@ static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
return ceil<milliseconds>(dur).count();
}
namespace {
class PipeEvent : public MainLoopWindows::IOEvent {
public:
explicit PipeEvent(HANDLE handle)
: IOEvent((IOObject::WaitableHandle)CreateEventW(
NULL, /*bManualReset=*/FALSE,
/*bInitialState=*/FALSE, NULL)),
m_handle(handle), m_ready(CreateEventW(NULL, /*bManualReset=*/FALSE,
/*bInitialState=*/FALSE, NULL)) {
assert(m_event && m_ready);
}
~PipeEvent() override {
if (m_monitor_thread.joinable()) {
m_stopped = true;
SetEvent(m_ready);
// Keep trying to cancel ReadFile() until the thread exits.
do {
CancelIoEx((HANDLE)m_handle, /*lpOverlapped=*/NULL);
} while (WaitForSingleObject(m_monitor_thread.native_handle(), 1) ==
WAIT_TIMEOUT);
m_monitor_thread.join();
}
CloseHandle((HANDLE)m_event);
CloseHandle(m_ready);
}
void WillPoll() override {
if (!m_monitor_thread.joinable())
m_monitor_thread = std::thread(&PipeEvent::Monitor, this);
}
void Disarm() override { SetEvent(m_ready); }
/// Monitors the handle performing a zero byte read to determine when data is
/// avaiable.
void Monitor() {
do {
char buf[1];
DWORD bytes_read = 0;
OVERLAPPED ov = {0};
// Block on a 0-byte read; this will only resume when data is
// available in the pipe. The pipe must be PIPE_WAIT or this thread
// will spin.
BOOL success =
ReadFile(m_handle, buf, /*nNumberOfBytesToRead=*/0, &bytes_read, &ov);
DWORD bytes_available = 0;
DWORD err = GetLastError();
if (!success && err == ERROR_IO_PENDING) {
success = GetOverlappedResult(m_handle, &ov, &bytes_read,
/*bWait=*/TRUE);
err = GetLastError();
}
if (success) {
success =
PeekNamedPipe(m_handle, NULL, 0, NULL, &bytes_available, NULL);
err = GetLastError();
}
if (success) {
if (bytes_available == 0) {
// This can happen with a zero-byte write. Try again.
continue;
}
} else if (err == ERROR_NO_DATA) {
// The pipe is nonblocking. Try again.
Sleep(0);
continue;
} else if (err == ERROR_OPERATION_ABORTED) {
// Read may have been cancelled, try again.
continue;
}
SetEvent((HANDLE)m_event);
// Wait until the current read is consumed before doing the next read.
WaitForSingleObject(m_ready, INFINITE);
} while (!m_stopped);
}
private:
HANDLE m_handle;
HANDLE m_ready;
std::thread m_monitor_thread;
std::atomic<bool> m_stopped = false;
};
class SocketEvent : public MainLoopWindows::IOEvent {
public:
explicit SocketEvent(SOCKET socket)
: IOEvent((IOObject::WaitableHandle)WSACreateEvent()), m_socket(socket) {
assert(event != WSA_INVALID_EVENT);
}
~SocketEvent() override { WSACloseEvent((HANDLE)m_event); }
void WillPoll() {
int result = WSAEventSelect(m_socket, (HANDLE)m_event,
FD_READ | FD_ACCEPT | FD_CLOSE);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);
}
void DidPoll() {
int result = WSAEventSelect(m_socket, WSA_INVALID_EVENT, 0);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);
}
void Disarm() override { WSAResetEvent((HANDLE)m_event); }
SOCKET m_socket;
};
} // namespace
MainLoopWindows::MainLoopWindows() {
m_interrupt_event = WSACreateEvent();
assert(m_interrupt_event != WSA_INVALID_EVENT);
@@ -44,14 +163,11 @@ MainLoopWindows::~MainLoopWindows() {
}
llvm::Expected<size_t> MainLoopWindows::Poll() {
std::vector<WSAEVENT> events;
std::vector<HANDLE> events;
events.reserve(m_read_fds.size() + 1);
for (auto &[fd, info] : m_read_fds) {
int result = WSAEventSelect(fd, info.event, FD_READ | FD_ACCEPT | FD_CLOSE);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);
events.push_back(info.event);
for (auto &[_, fd_info] : m_read_fds) {
fd_info.event->WillPoll();
events.push_back((HANDLE)fd_info.event->GetHandle());
}
events.push_back(m_interrupt_event);
@@ -59,11 +175,8 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
ToTimeout(GetNextWakeupTime()), FALSE);
for (auto &fd : m_read_fds) {
int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
assert(result == 0);
UNUSED_IF_ASSERT_DISABLED(result);
}
for (auto &[_, fd_info] : m_read_fds)
fd_info.event->DidPoll();
if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
return result - WSA_WAIT_EVENT_0;
@@ -83,28 +196,25 @@ MainLoopWindows::RegisterReadObject(const IOObjectSP &object_sp,
error = Status::FromErrorString("IO object is not valid.");
return nullptr;
}
if (object_sp->GetFdType() != IOObject::eFDTypeSocket) {
error = Status::FromErrorString(
"MainLoopWindows: non-socket types unsupported on Windows");
return nullptr;
}
WSAEVENT event = WSACreateEvent();
if (event == WSA_INVALID_EVENT) {
error =
Status::FromErrorStringWithFormat("Cannot create monitoring event.");
return nullptr;
}
IOObject::WaitableHandle waitable_handle = object_sp->GetWaitableHandle();
assert(waitable_handle != IOObject::kInvalidHandleValue);
const bool inserted =
m_read_fds
.try_emplace(object_sp->GetWaitableHandle(), FdInfo{event, callback})
.second;
if (!inserted) {
WSACloseEvent(event);
if (m_read_fds.find(waitable_handle) != m_read_fds.end()) {
error = Status::FromErrorStringWithFormat(
"File descriptor %d already monitored.",
object_sp->GetWaitableHandle());
"File descriptor %d already monitored.", waitable_handle);
return nullptr;
}
if (object_sp->GetFdType() == IOObject::eFDTypeSocket)
m_read_fds[waitable_handle] = {
std::make_unique<SocketEvent>((SOCKET)waitable_handle), callback};
else if (GetFileType(waitable_handle) == FILE_TYPE_PIPE)
m_read_fds[waitable_handle] = {
std::make_unique<PipeEvent>((HANDLE)waitable_handle), callback};
else {
error = Status::FromErrorStringWithFormat("Unsupported file type %d",
GetFileType(waitable_handle));
return nullptr;
}
@@ -114,18 +224,9 @@ MainLoopWindows::RegisterReadObject(const IOObjectSP &object_sp,
void MainLoopWindows::UnregisterReadObject(IOObject::WaitableHandle handle) {
auto it = m_read_fds.find(handle);
assert(it != m_read_fds.end());
BOOL result = WSACloseEvent(it->second.event);
assert(result == TRUE);
UNUSED_IF_ASSERT_DISABLED(result);
m_read_fds.erase(it);
}
void MainLoopWindows::ProcessReadObject(IOObject::WaitableHandle handle) {
auto it = m_read_fds.find(handle);
if (it != m_read_fds.end())
it->second.callback(*this); // Do the work
}
Status MainLoopWindows::Run() {
m_terminate_request = false;
@@ -138,8 +239,8 @@ Status MainLoopWindows::Run() {
if (*signaled_event < m_read_fds.size()) {
auto &KV = *std::next(m_read_fds.begin(), *signaled_event);
WSAResetEvent(KV.second.event);
ProcessReadObject(KV.first);
KV.second.event->Disarm();
KV.second.callback(*this); // Do the work.
} else {
assert(*signaled_event == m_read_fds.size());
WSAResetEvent(m_interrupt_event);

View File

@@ -8,7 +8,16 @@
#include "lldb/Utility/IOObject.h"
#ifdef _WIN32
#include "lldb/Host/windows/windows.h"
#endif
using namespace lldb_private;
#ifdef _WIN32
const IOObject::WaitableHandle IOObject::kInvalidHandleValue =
INVALID_HANDLE_VALUE;
#else
const IOObject::WaitableHandle IOObject::kInvalidHandleValue = -1;
#endif
IOObject::~IOObject() = default;