This infrastructure has proven proven its worth, so give it a more prominent place. My immediate motivation for this is the desire to reuse this infrastructure for qemu platform testing, but I believe this move makes sense independently of that. Moving this code to the packages tree will allow as to add more structure to the gdb client tests -- currently they are all crammed into the same test folder as that was the only way they could access this code. I'm splitting the code into two parts while moving it. The first once contains just the generic gdb protocol wrappers, while the other one contains the unit test glue. The reason for that is that for qemu testing, I need to run the gdb code in a separate process, so I will only be using the first part there. Differential Revision: https://reviews.llvm.org/D113893
493 lines
18 KiB
Python
493 lines
18 KiB
Python
import lldb
|
|
import binascii
|
|
import os.path
|
|
from lldbsuite.test.lldbtest import *
|
|
from lldbsuite.test.decorators import *
|
|
from lldbsuite.test.gdbclientutils import *
|
|
from lldbsuite.test.lldbgdbclient import GDBRemoteTestBase
|
|
|
|
|
|
class TestGDBRemoteClient(GDBRemoteTestBase):
|
|
|
|
mydir = TestBase.compute_mydir(__file__)
|
|
|
|
class gPacketResponder(MockGDBServerResponder):
|
|
registers = [
|
|
"name:rax;bitsize:64;offset:0;encoding:uint;format:hex;set:General Purpose Registers;ehframe:0;dwarf:0;",
|
|
"name:rbx;bitsize:64;offset:8;encoding:uint;format:hex;set:General Purpose Registers;ehframe:3;dwarf:3;",
|
|
"name:rcx;bitsize:64;offset:16;encoding:uint;format:hex;set:General Purpose Registers;ehframe:2;dwarf:2;generic:arg4;",
|
|
"name:rdx;bitsize:64;offset:24;encoding:uint;format:hex;set:General Purpose Registers;ehframe:1;dwarf:1;generic:arg3;",
|
|
"name:rdi;bitsize:64;offset:32;encoding:uint;format:hex;set:General Purpose Registers;ehframe:5;dwarf:5;generic:arg1;",
|
|
"name:rsi;bitsize:64;offset:40;encoding:uint;format:hex;set:General Purpose Registers;ehframe:4;dwarf:4;generic:arg2;",
|
|
"name:rbp;bitsize:64;offset:48;encoding:uint;format:hex;set:General Purpose Registers;ehframe:6;dwarf:6;generic:fp;",
|
|
"name:rsp;bitsize:64;offset:56;encoding:uint;format:hex;set:General Purpose Registers;ehframe:7;dwarf:7;generic:sp;",
|
|
]
|
|
|
|
def qRegisterInfo(self, num):
|
|
try:
|
|
return self.registers[num]
|
|
except IndexError:
|
|
return "E45"
|
|
|
|
def readRegisters(self):
|
|
return len(self.registers) * 16 * '0'
|
|
|
|
def readRegister(self, register):
|
|
return "0000000000000000"
|
|
|
|
def test_connect(self):
|
|
"""Test connecting to a remote gdb server"""
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"])
|
|
|
|
def test_attach_fail(self):
|
|
error_msg = "mock-error-msg"
|
|
|
|
class MyResponder(MockGDBServerResponder):
|
|
# Pretend we don't have any process during the initial queries.
|
|
def qC(self):
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
return "OK" # No threads.
|
|
|
|
# Then, when we are asked to attach, error out.
|
|
def vAttach(self, pid):
|
|
return "E42;" + binascii.hexlify(error_msg.encode()).decode()
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.dbg.CreateTarget("")
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
|
|
|
|
error = lldb.SBError()
|
|
target.AttachToProcessWithID(lldb.SBListener(), 47, error)
|
|
self.assertEquals(error_msg, error.GetCString())
|
|
|
|
def test_launch_fail(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
# Pretend we don't have any process during the initial queries.
|
|
def qC(self):
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
return "OK" # No threads.
|
|
|
|
# Then, when we are asked to attach, error out.
|
|
def A(self, packet):
|
|
return "E47"
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
|
|
|
|
error = lldb.SBError()
|
|
target.Launch(lldb.SBListener(), None, None, None, None, None,
|
|
None, 0, True, error)
|
|
self.assertEquals("'A' packet returned an error: 71", error.GetCString())
|
|
|
|
def test_read_registers_using_g_packets(self):
|
|
"""Test reading registers using 'g' packets (default behavior)"""
|
|
self.dbg.HandleCommand(
|
|
"settings set plugin.process.gdb-remote.use-g-packet-for-reading true")
|
|
self.addTearDownHook(lambda:
|
|
self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false"))
|
|
self.server.responder = self.gPacketResponder()
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.assertEquals(1, self.server.responder.packetLog.count("g"))
|
|
self.server.responder.packetLog = []
|
|
self.read_registers(process)
|
|
# Reading registers should not cause any 'p' packets to be exchanged.
|
|
self.assertEquals(
|
|
0, len([p for p in self.server.responder.packetLog if p.startswith("p")]))
|
|
|
|
def test_read_registers_using_p_packets(self):
|
|
"""Test reading registers using 'p' packets"""
|
|
self.dbg.HandleCommand(
|
|
"settings set plugin.process.gdb-remote.use-g-packet-for-reading false")
|
|
self.server.responder = self.gPacketResponder()
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.read_registers(process)
|
|
self.assertNotIn("g", self.server.responder.packetLog)
|
|
self.assertGreater(
|
|
len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0)
|
|
|
|
def test_write_registers_using_P_packets(self):
|
|
"""Test writing registers using 'P' packets (default behavior)"""
|
|
self.server.responder = self.gPacketResponder()
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.write_registers(process)
|
|
self.assertEquals(0, len(
|
|
[p for p in self.server.responder.packetLog if p.startswith("G")]))
|
|
self.assertGreater(
|
|
len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0)
|
|
|
|
def test_write_registers_using_G_packets(self):
|
|
"""Test writing registers using 'G' packets"""
|
|
|
|
class MyResponder(self.gPacketResponder):
|
|
def readRegister(self, register):
|
|
# empty string means unsupported
|
|
return ""
|
|
|
|
self.server.responder = MyResponder()
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.write_registers(process)
|
|
self.assertEquals(0, len(
|
|
[p for p in self.server.responder.packetLog if p.startswith("P")]))
|
|
self.assertGreater(len(
|
|
[p for p in self.server.responder.packetLog if p.startswith("G")]), 0)
|
|
|
|
def read_registers(self, process):
|
|
self.for_each_gpr(
|
|
process, lambda r: self.assertEquals("0x0000000000000000", r.GetValue()))
|
|
|
|
def write_registers(self, process):
|
|
self.for_each_gpr(
|
|
process, lambda r: r.SetValueFromCString("0x0000000000000000"))
|
|
|
|
def for_each_gpr(self, process, operation):
|
|
registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters()
|
|
self.assertGreater(registers.GetSize(), 0)
|
|
regSet = registers[0]
|
|
numChildren = regSet.GetNumChildren()
|
|
self.assertGreater(numChildren, 0)
|
|
for i in range(numChildren):
|
|
operation(regSet.GetChildAtIndex(i))
|
|
|
|
def test_launch_A(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def __init__(self, *args, **kwargs):
|
|
self.started = False
|
|
return super().__init__(*args, **kwargs)
|
|
|
|
def qC(self):
|
|
if self.started:
|
|
return "QCp10.10"
|
|
else:
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
if self.started:
|
|
return "mp10.10"
|
|
else:
|
|
return "E42"
|
|
|
|
def qsThreadInfo(self):
|
|
return "l"
|
|
|
|
def A(self, packet):
|
|
self.started = True
|
|
return "OK"
|
|
|
|
def qLaunchSuccess(self):
|
|
if self.started:
|
|
return "OK"
|
|
return "E42"
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.createTarget("a.yaml")
|
|
# NB: apparently GDB packets are using "/" on Windows too
|
|
exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
|
|
exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
|
|
[lldb.eStateConnected])
|
|
|
|
target.Launch(lldb.SBListener(),
|
|
["arg1", "arg2", "arg3"], # argv
|
|
[], # envp
|
|
None, # stdin_path
|
|
None, # stdout_path
|
|
None, # stderr_path
|
|
None, # working_directory
|
|
0, # launch_flags
|
|
True, # stop_at_entry
|
|
lldb.SBError()) # error
|
|
self.assertTrue(process, PROCESS_IS_VALID)
|
|
self.assertEqual(process.GetProcessID(), 16)
|
|
|
|
self.assertPacketLogContains([
|
|
"A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % (
|
|
len(exe_hex), exe_hex),
|
|
])
|
|
|
|
def test_launch_vRun(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def __init__(self, *args, **kwargs):
|
|
self.started = False
|
|
return super().__init__(*args, **kwargs)
|
|
|
|
def qC(self):
|
|
if self.started:
|
|
return "QCp10.10"
|
|
else:
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
if self.started:
|
|
return "mp10.10"
|
|
else:
|
|
return "E42"
|
|
|
|
def qsThreadInfo(self):
|
|
return "l"
|
|
|
|
def vRun(self, packet):
|
|
self.started = True
|
|
return "T13"
|
|
|
|
def A(self, packet):
|
|
return "E28"
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.createTarget("a.yaml")
|
|
# NB: apparently GDB packets are using "/" on Windows too
|
|
exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
|
|
exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
|
|
[lldb.eStateConnected])
|
|
|
|
process = target.Launch(lldb.SBListener(),
|
|
["arg1", "arg2", "arg3"], # argv
|
|
[], # envp
|
|
None, # stdin_path
|
|
None, # stdout_path
|
|
None, # stderr_path
|
|
None, # working_directory
|
|
0, # launch_flags
|
|
True, # stop_at_entry
|
|
lldb.SBError()) # error
|
|
self.assertTrue(process, PROCESS_IS_VALID)
|
|
self.assertEqual(process.GetProcessID(), 16)
|
|
|
|
self.assertPacketLogContains([
|
|
"vRun;%s;61726731;61726732;61726733" % (exe_hex,)
|
|
])
|
|
|
|
def test_launch_QEnvironment(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def qC(self):
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
return "E42"
|
|
|
|
def vRun(self, packet):
|
|
self.started = True
|
|
return "E28"
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
|
|
[lldb.eStateConnected])
|
|
|
|
target.Launch(lldb.SBListener(),
|
|
[], # argv
|
|
["PLAIN=foo",
|
|
"NEEDSENC=frob$",
|
|
"NEEDSENC2=fr*ob",
|
|
"NEEDSENC3=fro}b",
|
|
"NEEDSENC4=f#rob",
|
|
"EQUALS=foo=bar",
|
|
], # envp
|
|
None, # stdin_path
|
|
None, # stdout_path
|
|
None, # stderr_path
|
|
None, # working_directory
|
|
0, # launch_flags
|
|
True, # stop_at_entry
|
|
lldb.SBError()) # error
|
|
|
|
self.assertPacketLogContains([
|
|
"QEnvironment:PLAIN=foo",
|
|
"QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
|
|
"QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
|
|
"QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
|
|
"QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
|
|
"QEnvironment:EQUALS=foo=bar",
|
|
])
|
|
|
|
def test_launch_QEnvironmentHexEncoded_only(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def qC(self):
|
|
return "E42"
|
|
|
|
def qfThreadInfo(self):
|
|
return "E42"
|
|
|
|
def vRun(self, packet):
|
|
self.started = True
|
|
return "E28"
|
|
|
|
def QEnvironment(self, packet):
|
|
return ""
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
|
|
[lldb.eStateConnected])
|
|
|
|
target.Launch(lldb.SBListener(),
|
|
[], # argv
|
|
["PLAIN=foo",
|
|
"NEEDSENC=frob$",
|
|
"NEEDSENC2=fr*ob",
|
|
"NEEDSENC3=fro}b",
|
|
"NEEDSENC4=f#rob",
|
|
"EQUALS=foo=bar",
|
|
], # envp
|
|
None, # stdin_path
|
|
None, # stdout_path
|
|
None, # stderr_path
|
|
None, # working_directory
|
|
0, # launch_flags
|
|
True, # stop_at_entry
|
|
lldb.SBError()) # error
|
|
|
|
self.assertPacketLogContains([
|
|
"QEnvironmentHexEncoded:504c41494e3d666f6f",
|
|
"QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
|
|
"QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
|
|
"QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
|
|
"QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
|
|
"QEnvironmentHexEncoded:455155414c533d666f6f3d626172",
|
|
])
|
|
|
|
def test_detach_no_multiprocess(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.detached = None
|
|
|
|
def qfThreadInfo(self):
|
|
return "10200"
|
|
|
|
def D(self, packet):
|
|
self.detached = packet
|
|
return "OK"
|
|
|
|
self.server.responder = MyResponder()
|
|
target = self.dbg.CreateTarget('')
|
|
process = self.connect(target)
|
|
process.Detach()
|
|
self.assertEqual(self.server.responder.detached, "D")
|
|
|
|
def test_detach_pid(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def __init__(self, test_case):
|
|
super().__init__()
|
|
self.test_case = test_case
|
|
self.detached = None
|
|
|
|
def qSupported(self, client_supported):
|
|
self.test_case.assertIn("multiprocess+", client_supported)
|
|
return "multiprocess+;" + super().qSupported(client_supported)
|
|
|
|
def qfThreadInfo(self):
|
|
return "mp400.10200"
|
|
|
|
def D(self, packet):
|
|
self.detached = packet
|
|
return "OK"
|
|
|
|
self.server.responder = MyResponder(self)
|
|
target = self.dbg.CreateTarget('')
|
|
process = self.connect(target)
|
|
process.Detach()
|
|
self.assertRegex(self.server.responder.detached, r"D;0*400")
|
|
|
|
def test_signal_gdb(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def qSupported(self, client_supported):
|
|
return "PacketSize=3fff;QStartNoAckMode+"
|
|
|
|
def haltReason(self):
|
|
return "S0a"
|
|
|
|
def cont(self):
|
|
return self.haltReason()
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
self.runCmd("platform select remote-linux")
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.assertEqual(process.threads[0].GetStopReason(),
|
|
lldb.eStopReasonSignal)
|
|
self.assertEqual(process.threads[0].GetStopDescription(100),
|
|
'signal SIGBUS')
|
|
|
|
def test_signal_lldb_old(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def qSupported(self, client_supported):
|
|
return "PacketSize=3fff;QStartNoAckMode+"
|
|
|
|
def qHostInfo(self):
|
|
return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;"
|
|
|
|
def QThreadSuffixSupported(self):
|
|
return "OK"
|
|
|
|
def haltReason(self):
|
|
return "S0a"
|
|
|
|
def cont(self):
|
|
return self.haltReason()
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
self.runCmd("platform select remote-linux")
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.assertEqual(process.threads[0].GetStopReason(),
|
|
lldb.eStopReasonSignal)
|
|
self.assertEqual(process.threads[0].GetStopDescription(100),
|
|
'signal SIGUSR1')
|
|
|
|
def test_signal_lldb(self):
|
|
class MyResponder(MockGDBServerResponder):
|
|
def qSupported(self, client_supported):
|
|
return "PacketSize=3fff;QStartNoAckMode+;native-signals+"
|
|
|
|
def qHostInfo(self):
|
|
return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;"
|
|
|
|
def haltReason(self):
|
|
return "S0a"
|
|
|
|
def cont(self):
|
|
return self.haltReason()
|
|
|
|
self.server.responder = MyResponder()
|
|
|
|
self.runCmd("platform select remote-linux")
|
|
target = self.createTarget("a.yaml")
|
|
process = self.connect(target)
|
|
|
|
self.assertEqual(process.threads[0].GetStopReason(),
|
|
lldb.eStopReasonSignal)
|
|
self.assertEqual(process.threads[0].GetStopDescription(100),
|
|
'signal SIGUSR1')
|