Nearly all of our lldb-server tests have two flavours (lldb-server and debugserver). Each of them is tagged with an appropriate decorator, and each of them starts with a call to a matching "init" method. The init calls are mandatory, and it's not possible to meaningfully combine them with a different decorator. This patch leverages the existing decorators to also tag the tests with the appropriate debug server tag, similar to how we do with debug info flavours. This allows us to make the "init" calls from inside the common setUp method.
305 lines
12 KiB
Python
305 lines
12 KiB
Python
|
|
import json
|
|
import re
|
|
|
|
import gdbremote_testcase
|
|
from lldbsuite.test.decorators import *
|
|
from lldbsuite.test.lldbtest import *
|
|
from lldbsuite.test import lldbutil
|
|
|
|
class TestGdbRemoteThreadsInStopReply(
|
|
gdbremote_testcase.GdbRemoteTestCaseBase):
|
|
|
|
mydir = TestBase.compute_mydir(__file__)
|
|
|
|
ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
|
|
"read packet: $QListThreadsInStopReply#21",
|
|
"send packet: $OK#00",
|
|
]
|
|
|
|
def gather_stop_reply_fields(self, post_startup_log_lines, thread_count,
|
|
field_names):
|
|
# Set up the inferior args.
|
|
inferior_args = []
|
|
for i in range(thread_count - 1):
|
|
inferior_args.append("thread:new")
|
|
inferior_args.append("sleep:10")
|
|
procs = self.prep_debug_monitor_and_inferior(
|
|
inferior_args=inferior_args)
|
|
|
|
self.add_register_info_collection_packets()
|
|
self.add_process_info_collection_packets()
|
|
|
|
# Assumes test_sequence has anything added needed to setup the initial state.
|
|
# (Like optionally enabling QThreadsInStopReply.)
|
|
if post_startup_log_lines:
|
|
self.test_sequence.add_log_lines(post_startup_log_lines, True)
|
|
self.test_sequence.add_log_lines([
|
|
"read packet: $c#63"
|
|
], True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
hw_info = self.parse_hw_info(context)
|
|
|
|
# Give threads time to start up, then break.
|
|
time.sleep(self.DEFAULT_SLEEP)
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(
|
|
[
|
|
"read packet: {}".format(
|
|
chr(3)),
|
|
{
|
|
"direction": "send",
|
|
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
|
|
"capture": {
|
|
1: "stop_result",
|
|
2: "key_vals_text"}},
|
|
],
|
|
True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Wait until all threads have started.
|
|
threads = self.wait_for_thread_count(thread_count)
|
|
self.assertIsNotNone(threads)
|
|
self.assertEqual(len(threads), thread_count)
|
|
|
|
# Run, then stop the process, grab the stop reply content.
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(["read packet: $c#63",
|
|
"read packet: {}".format(chr(3)),
|
|
{"direction": "send",
|
|
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
|
|
"capture": {1: "stop_result",
|
|
2: "key_vals_text"}},
|
|
],
|
|
True)
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
# Parse the stop reply contents.
|
|
key_vals_text = context.get("key_vals_text")
|
|
self.assertIsNotNone(key_vals_text)
|
|
kv_dict = self.parse_key_val_dict(key_vals_text)
|
|
self.assertIsNotNone(kv_dict)
|
|
|
|
result = dict();
|
|
result["pc_register"] = hw_info["pc_register"]
|
|
result["little_endian"] = hw_info["little_endian"]
|
|
for key_field in field_names:
|
|
result[key_field] = kv_dict.get(key_field)
|
|
|
|
return result
|
|
|
|
def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):
|
|
# Pull out threads from stop response.
|
|
stop_reply_threads_text = self.gather_stop_reply_fields(
|
|
post_startup_log_lines, thread_count, ["threads"])["threads"]
|
|
if stop_reply_threads_text:
|
|
return [int(thread_id, 16)
|
|
for thread_id in stop_reply_threads_text.split(",")]
|
|
else:
|
|
return []
|
|
|
|
def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count):
|
|
results = self.gather_stop_reply_fields( post_startup_log_lines,
|
|
thread_count, ["threads", "thread-pcs"])
|
|
if not results:
|
|
return []
|
|
|
|
threads_text = results["threads"]
|
|
pcs_text = results["thread-pcs"]
|
|
thread_ids = threads_text.split(",")
|
|
pcs = pcs_text.split(",")
|
|
self.assertEquals(len(thread_ids), len(pcs))
|
|
|
|
thread_pcs = dict()
|
|
for i in range(0, len(pcs)):
|
|
thread_pcs[int(thread_ids[i], 16)] = pcs[i]
|
|
|
|
result = dict()
|
|
result["thread_pcs"] = thread_pcs
|
|
result["pc_register"] = results["pc_register"]
|
|
result["little_endian"] = results["little_endian"]
|
|
return result
|
|
|
|
def switch_endian(self, egg):
|
|
return "".join(reversed(re.findall("..", egg)))
|
|
|
|
def parse_hw_info(self, context):
|
|
self.assertIsNotNone(context)
|
|
process_info = self.parse_process_info_response(context)
|
|
endian = process_info.get("endian")
|
|
reg_info = self.parse_register_info_packets(context)
|
|
(pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
|
|
|
|
hw_info = dict()
|
|
hw_info["pc_register"] = pc_lldb_reg_index
|
|
hw_info["little_endian"] = (endian == "little")
|
|
return hw_info
|
|
|
|
def gather_threads_info_pcs(self, pc_register, little_endian):
|
|
self.reset_test_sequence()
|
|
self.test_sequence.add_log_lines(
|
|
[
|
|
"read packet: $jThreadsInfo#c1",
|
|
{
|
|
"direction": "send",
|
|
"regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
|
|
"capture": {
|
|
1: "threads_info"}},
|
|
],
|
|
True)
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
threads_info = context.get("threads_info")
|
|
register = str(pc_register)
|
|
# The jThreadsInfo response is not valid JSON data, so we have to
|
|
# clean it up first.
|
|
jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
|
|
thread_pcs = dict()
|
|
for thread_info in jthreads_info:
|
|
tid = thread_info["tid"]
|
|
pc = thread_info["registers"][register]
|
|
thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
|
|
|
|
return thread_pcs
|
|
|
|
def QListThreadsInStopReply_supported(self):
|
|
procs = self.prep_debug_monitor_and_inferior()
|
|
self.test_sequence.add_log_lines(
|
|
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
|
|
@debugserver_test
|
|
def test_QListThreadsInStopReply_supported_debugserver(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.QListThreadsInStopReply_supported()
|
|
|
|
@llgs_test
|
|
def test_QListThreadsInStopReply_supported_llgs(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.QListThreadsInStopReply_supported()
|
|
|
|
def stop_reply_reports_multiple_threads(self, thread_count):
|
|
# Gather threads from stop notification when QThreadsInStopReply is
|
|
# enabled.
|
|
stop_reply_threads = self.gather_stop_reply_threads(
|
|
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
|
|
self.assertEqual(len(stop_reply_threads), thread_count)
|
|
|
|
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
|
|
@debugserver_test
|
|
def test_stop_reply_reports_multiple_threads_debugserver(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_reports_multiple_threads(5)
|
|
|
|
# In current implementation of llgs on Windows, as a response to '\x03' packet, the debugger
|
|
# of the native process will trigger a call to DebugBreakProcess that will create a new thread
|
|
# to handle the exception debug event. So one more stop thread will be notified to the
|
|
# delegate, e.g. llgs. So tests below to assert the stop threads number will all fail.
|
|
@expectedFailureAll(oslist=["windows"])
|
|
@skipIfNetBSD
|
|
@llgs_test
|
|
def test_stop_reply_reports_multiple_threads_llgs(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_reports_multiple_threads(5)
|
|
|
|
def no_QListThreadsInStopReply_supplies_no_threads(self, thread_count):
|
|
# Gather threads from stop notification when QThreadsInStopReply is not
|
|
# enabled.
|
|
stop_reply_threads = self.gather_stop_reply_threads(None, thread_count)
|
|
self.assertEqual(len(stop_reply_threads), 0)
|
|
|
|
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
|
|
@debugserver_test
|
|
def test_no_QListThreadsInStopReply_supplies_no_threads_debugserver(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.no_QListThreadsInStopReply_supplies_no_threads(5)
|
|
|
|
@expectedFailureAll(oslist=["windows"])
|
|
@skipIfNetBSD
|
|
@llgs_test
|
|
def test_no_QListThreadsInStopReply_supplies_no_threads_llgs(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.no_QListThreadsInStopReply_supplies_no_threads(5)
|
|
|
|
def stop_reply_reports_correct_threads(self, thread_count):
|
|
# Gather threads from stop notification when QThreadsInStopReply is
|
|
# enabled.
|
|
stop_reply_threads = self.gather_stop_reply_threads(
|
|
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
|
|
self.assertEqual(len(stop_reply_threads), thread_count)
|
|
|
|
# Gather threads from q{f,s}ThreadInfo.
|
|
self.reset_test_sequence()
|
|
self.add_threadinfo_collection_packets()
|
|
|
|
context = self.expect_gdbremote_sequence()
|
|
self.assertIsNotNone(context)
|
|
|
|
threads = self.parse_threadinfo_packets(context)
|
|
self.assertIsNotNone(threads)
|
|
self.assertEqual(len(threads), thread_count)
|
|
|
|
# Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
|
|
for tid in threads:
|
|
self.assertTrue(tid in stop_reply_threads)
|
|
|
|
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
|
|
@debugserver_test
|
|
def test_stop_reply_reports_correct_threads_debugserver(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_reports_correct_threads(5)
|
|
|
|
@expectedFailureAll(oslist=["windows"])
|
|
@skipIfNetBSD
|
|
@llgs_test
|
|
def test_stop_reply_reports_correct_threads_llgs(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_reports_correct_threads(5)
|
|
|
|
def stop_reply_contains_thread_pcs(self, thread_count):
|
|
results = self.gather_stop_reply_pcs(
|
|
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
|
|
stop_reply_pcs = results["thread_pcs"]
|
|
pc_register = results["pc_register"]
|
|
little_endian = results["little_endian"]
|
|
self.assertEqual(len(stop_reply_pcs), thread_count)
|
|
|
|
threads_info_pcs = self.gather_threads_info_pcs(pc_register,
|
|
little_endian)
|
|
|
|
self.assertEqual(len(threads_info_pcs), thread_count)
|
|
for thread_id in stop_reply_pcs:
|
|
self.assertTrue(thread_id in threads_info_pcs)
|
|
self.assertTrue(int(stop_reply_pcs[thread_id], 16)
|
|
== int(threads_info_pcs[thread_id], 16))
|
|
|
|
@expectedFailureAll(oslist=["windows"])
|
|
@skipIfNetBSD
|
|
@llgs_test
|
|
def test_stop_reply_contains_thread_pcs_llgs(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_contains_thread_pcs(5)
|
|
|
|
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
|
|
@debugserver_test
|
|
def test_stop_reply_contains_thread_pcs_debugserver(self):
|
|
self.build()
|
|
self.set_inferior_startup_launch()
|
|
self.stop_reply_contains_thread_pcs(5)
|