diff --git a/test/launch/simple_test_with_virtual_eth.launch.py b/test/launch/simple_test_with_virtual_eth.launch.py index c50fd8d..1abd4b6 100644 --- a/test/launch/simple_test_with_virtual_eth.launch.py +++ b/test/launch/simple_test_with_virtual_eth.launch.py @@ -1,3 +1,5 @@ +import unittest + from launch import LaunchDescription from launch.actions import ( DeclareLaunchArgument, @@ -13,6 +15,7 @@ ) from launch_ros.substitutions import FindPackageShare import launch_testing +import launch_testing.asserts def generate_test_description(): @@ -31,9 +34,16 @@ def generate_test_description(): description="Path to the setup script for creating virtual ethernet interface", ) ) + declared_arguments.append( + DeclareLaunchArgument( + "test_binary_dir", + description="Path to the test binary directory (passed by add_ros_test)", + ) + ) # Initialize LaunchConfigurations setup_script = LaunchConfiguration("setup_script") + test_binary_dir = LaunchConfiguration("test_binary_dir") # Execute the setup script to create virtual ethernet interface setup_script_action = ExecuteProcess( @@ -42,7 +52,6 @@ def generate_test_description(): shell=True, ) - # Include the elevated permissions 1 DOF launch file elevated_permissions_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( [ @@ -50,24 +59,39 @@ def generate_test_description(): [ FindPackageShare("synapticon_ros2_control"), "launch", - "elevated_permissions_1_dof.launch.py", + "elevated_permissions_2_dof.launch.py", ] ) ] - ) + ), + launch_arguments={"eth_device": "eno0"}.items(), + ) + + # The test executable to run (located in the build directory, passed via test_binary_dir) + test_controller_manager_integration = ExecuteProcess( + cmd=[ + PathJoinSubstitution( + [test_binary_dir, "test_controller_manager_integration"] + ) + ], + output="both", ) - # Delay the elevated permissions launch after setup script completion - delay_elevated_permissions_after_setup = RegisterEventHandler( + # Delay the elevated permissions launch and test after setup script completion + # The test executable will wait for the controller_manager node internally + delay_after_setup = RegisterEventHandler( event_handler=OnProcessExit( target_action=setup_script_action, - on_exit=[elevated_permissions_launch], + on_exit=[ + elevated_permissions_launch, + test_controller_manager_integration, + ], ) ) nodes = [ setup_script_action, - delay_elevated_permissions_after_setup, + delay_after_setup, # In tests where all of the procs under tests terminate themselves, it's necessary # to add a dummy process not under test to keep the launch alive. launch_test # provides a simple launch action that does this: @@ -75,4 +99,34 @@ def generate_test_description(): launch_testing.actions.ReadyToTest(), ] - return LaunchDescription(declared_arguments + nodes) + return ( + LaunchDescription(declared_arguments + nodes), + {"test_controller_manager_integration": test_controller_manager_integration}, + ) + + +class TestControllerManagerIntegration(unittest.TestCase): + """Active test that waits for the test executable to complete.""" + + def test_wait_for_test_completion( + self, test_controller_manager_integration, proc_info + ): + """Wait for the test executable to complete.""" + # Wait for the gtest process to finish + proc_info.assertWaitForShutdown( + process=test_controller_manager_integration, + timeout=30.0, + ) + + +@launch_testing.post_shutdown_test() +class TestProcessOutput(unittest.TestCase): + def test_exit_code(self, proc_info): + """Check that the test executable finished with exit code 0 or was terminated by SIGINT.""" + # Accept both clean exit (0) and SIGINT (-2) since launch_testing sends SIGINT + # to shut down processes, and the gtest may not have exited before receiving it. + launch_testing.asserts.assertExitCodes( + proc_info, + [launch_testing.asserts.EXIT_OK, -2], + process="test_controller_manager_integration", + )