28
28
# POSSIBILITY OF SUCH DAMAGE.
29
29
30
30
from clearpath_config .common .types .platform import Platform
31
- from clearpath_generator_common .common import BaseGenerator
32
31
from clearpath_tests .mobility_test import MobilityTestNode
33
32
from clearpath_tests .test_node import ClearpathTestResult
34
33
from clearpath_tests .tf import ConfigurableTransformListener
34
+ from clearpath_tests .timer import Timeout
35
+
35
36
from geometry_msgs .msg import Vector3Stamped
36
37
import rclpy
37
- from rclpy .duration import Duration
38
38
from rclpy .qos import qos_profile_sensor_data
39
39
from sensor_msgs .msg import Imu
40
40
from tf2_geometry_msgs import do_transform_vector3
@@ -140,14 +140,14 @@ def run_test(self):
140
140
self .start ()
141
141
142
142
# wait until we get the first IMU message or 10s passes
143
- start_time = self .get_clock ().now ()
144
- timeout_duration = Duration (seconds = 10 )
143
+ timeout = Timeout (self , 10.0 )
145
144
while (
146
145
self .latest_imu is None
147
- and self . get_clock (). now () - start_time <= timeout_duration
146
+ and not timeout . elapsed
148
147
and not self .test_error
149
148
):
150
- rclpy .spin_once (self )
149
+ rclpy .spin_once (self , timeout_sec = 1.0 )
150
+ timeout .abort ()
151
151
152
152
if self .test_error :
153
153
self .get_logger ().warning (f'Test aborted due to an error: { self .test_error_msg } ' )
@@ -162,13 +162,14 @@ def run_test(self):
162
162
163
163
# start turning, but wait 1s for us to get up to speed before recording data
164
164
self .cmd_vel .twist .angular .z = self .max_speed
165
- startup_wait = Duration (seconds = 1.0 )
166
- start_time = self .get_clock ().now ()
165
+ timeout = Timeout (self , 1.0 )
167
166
while (
168
167
not self .test_error
169
- and self . get_clock (). now () - start_time <= startup_wait
168
+ and not timeout . elapsed
170
169
):
171
- rclpy .spin_once (self )
170
+ rclpy .spin_once (self , timeout_sec = 1.0 )
171
+ timeout .abort ()
172
+
172
173
if self .test_error :
173
174
self .record_data = False
174
175
self .cmd_vel .twist .angular .z = 0.0
@@ -177,14 +178,15 @@ def run_test(self):
177
178
178
179
# record data for 10s
179
180
self .record_data = True
180
- test_wait = Duration (seconds = 10 )
181
- start_time = self .get_clock ().now ()
181
+ timeout = Timeout (self , 10.0 )
182
182
while (
183
183
not self .test_error
184
- and self . get_clock (). now () - start_time <= test_wait
184
+ and not timeout . elapsed
185
185
):
186
- rclpy .spin_once (self )
186
+ rclpy .spin_once (self , timeout_sec = 1.0 )
187
+ timeout .abort ()
187
188
self .record_data = False
189
+
188
190
if self .test_error :
189
191
self .cmd_vel .twist .angular .z = 0.0
190
192
self .get_logger ().warning (f'Test aborted due to an error: { self .test_error_msg } ' )
@@ -193,10 +195,10 @@ def run_test(self):
193
195
# stop driving
194
196
# wait 1s to ensure we publish the command
195
197
self .cmd_vel .twist .angular .z = 0.0
196
- test_wait = Duration ( seconds = 1 )
197
- start_time = self . get_clock (). now ()
198
- while self . get_clock (). now () - start_time <= test_wait :
199
- rclpy . spin_once ( self )
198
+ timeout = Timeout ( self , 1.0 )
199
+ while not timeout . elapsed :
200
+ rclpy . spin_once ( self , timeout_sec = 1.0 )
201
+ timeout . abort ( )
200
202
201
203
# process the results
202
204
results = self .test_results
0 commit comments