@@ -68,12 +68,26 @@ def setUp(self):
6868 self .test_node = rclpy .create_node ("test_node" )
6969 self .event_name = "cpu_temperature_is_high"
7070 self .evaluation_time = 0.5 # 500ms
71+ self .discovery_timeout = 5.0 # seconds
7172 self .input_diagnostics_topic = "/diagnostics"
7273 self .output_diagnostics_topic = "/diagnostics/fault_injection"
7374
7475 def tearDown (self ):
7576 self .test_node .destroy_node ()
7677
78+ def _wait_for_discovery (self , publishers , subscribers = None ):
79+ """Wait until all publishers have matched with at least one subscriber and vice versa."""
80+ end_time = time .time () + self .discovery_timeout
81+ while time .time () < end_time :
82+ pubs_matched = all (pub .get_subscription_count () > 0 for pub in publishers )
83+ subs_matched = subscribers is None or all (
84+ self .test_node .count_publishers (topic ) > 0 for topic in subscribers
85+ )
86+ if pubs_matched and subs_matched :
87+ return
88+ rclpy .spin_once (self .test_node , timeout_sec = 0.1 )
89+ self .fail ("Timed out waiting for endpoint discovery" )
90+
7791 @staticmethod
7892 def print_message (stat ):
7993 logger .debug ("===========================" )
@@ -110,6 +124,12 @@ def test_node_link(self):
110124 DiagnosticArray , self .output_diagnostics_topic , lambda msg : msg_buffer .append (msg ), 10
111125 )
112126
127+ # Wait for endpoint discovery before testing.
128+ self ._wait_for_discovery (
129+ publishers = [pub_events , pub_diagnostics ],
130+ subscribers = [self .output_diagnostics_topic ],
131+ )
132+
113133 # Test init state.
114134 # Expect fault_injection_node does not publish diagnostics without input.
115135 end_time = time .time () + self .evaluation_time
@@ -152,13 +172,19 @@ def test_receive_multiple_message_simultaneously(self):
152172 DiagnosticArray , self .output_diagnostics_topic , lambda msg : msg_buffer .append (msg ), 10
153173 )
154174
155- # Call spin_once() so that the publisher publish messages simultaneously
156175 pub_events_1 = self .test_node .create_publisher (SimulationEvents , "/simulation/events" , 10 )
157176 pub_events_2 = self .test_node .create_publisher (SimulationEvents , "/simulation/events" , 10 )
158177 pub_diagnostics = self .test_node .create_publisher (
159178 DiagnosticArray , self .input_diagnostics_topic , 10
160179 )
161- rclpy .spin_once (self .test_node , timeout_sec = 0.1 )
180+
181+ # Wait for endpoint discovery before publishing.
182+ self ._wait_for_discovery (
183+ publishers = [pub_events_1 , pub_events_2 , pub_diagnostics ],
184+ subscribers = [self .output_diagnostics_topic ],
185+ )
186+
187+ # Publish messages simultaneously
162188 input_msg = DiagnosticArray ()
163189 input_msg .status = [
164190 DiagnosticStatus (name = ": CPU Load Average" , level = DiagnosticStatus .OK , message = "OK" ),
0 commit comments