diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 38e6242..b8df830 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -6,10 +6,25 @@ # Unit tests and linting run on every push and PR. # Integration tests only run on PRs to master (expensive, ~7 min). # -# Required secrets for integration tests: -# RUSHTI_TEST_TM1_CONFIG - Full content of a config.ini file with TM1 connection details. -# The file format follows the standard RushTI config.ini format. -# See tests/config.ini.template for the expected structure. +# Required configuration for integration tests: +# +# Variable (non-sensitive connection details): +# RUSHTI_TEST_TM1_CONFIG - config.ini content WITHOUT passwords. +# Example: +# [tm1srv01] +# base_url = https://your-server/tm1/api/tm1 +# user = your_user +# namespace = LDAP +# ssl = true +# verify = true +# async_requests_mode = true +# +# Secret (sensitive credentials only): +# RUSHTI_TEST_TM1_PASSWORD - The password for tm1srv01. +# +# Legacy (still supported): +# Secret: RUSHTI_TEST_TM1_CONFIG - Full config.ini content including password. +# If the variable + secret pair is configured, it takes precedence over the legacy secret. name: Tests @@ -103,15 +118,25 @@ jobs: - name: Check TM1 configuration id: check-tm1 run: | - if [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then + if [ -n "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" ] && [ -n "${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" ]; then + echo "tm1_configured=true" >> $GITHUB_OUTPUT + echo "config_source=variable" >> $GITHUB_OUTPUT + elif [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then echo "tm1_configured=true" >> $GITHUB_OUTPUT + echo "config_source=legacy_secret" >> $GITHUB_OUTPUT else echo "tm1_configured=false" >> $GITHUB_OUTPUT - echo "::warning::TM1 config secret not configured. Integration tests will be skipped." + echo "::warning::TM1 config not configured. Integration tests will be skipped." fi - - name: Create TM1 config file - if: steps.check-tm1.outputs.tm1_configured == 'true' + - name: Create TM1 config file (from variable + secret) + if: steps.check-tm1.outputs.config_source == 'variable' + run: | + echo "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini + echo "password = ${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" >> tests/config.ini + + - name: Create TM1 config file (legacy - full secret) + if: steps.check-tm1.outputs.config_source == 'legacy_secret' run: | echo "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini @@ -129,9 +154,15 @@ jobs: - name: Skip message if: steps.check-tm1.outputs.tm1_configured != 'true' run: | - echo "Integration tests skipped - TM1 config secret not configured" - echo "To enable, add the RUSHTI_TEST_TM1_CONFIG secret to your repository." - echo "The secret should contain the full config.ini content with TM1 connection details." + echo "Integration tests skipped - TM1 config not configured" + echo "" + echo "To enable (recommended):" + echo " 1. Add variable RUSHTI_TEST_TM1_CONFIG with connection details (no password)" + echo " 2. Add secret RUSHTI_TEST_TM1_PASSWORD with the TM1 password" + echo "" + echo "Legacy (still supported):" + echo " Add secret RUSHTI_TEST_TM1_CONFIG with the full config.ini content" + echo "" echo "See tests/config.ini.template for the expected format." lint: diff --git a/docs/features/exclusive-mode.md b/docs/features/exclusive-mode.md index 5724018..9d9a59d 100644 --- a/docs/features/exclusive-mode.md +++ b/docs/features/exclusive-mode.md @@ -181,6 +181,41 @@ timeout = 1800 # Wait up to 30 minutes --- +## Early Session Release + +When a workflow spans multiple TM1 instances, RushTI automatically releases sessions from instances that have no remaining tasks — without waiting for the entire workflow to finish. This is especially valuable in exclusive mode, where holding an idle session blocks other RushTI instances from accessing that server. + +### Example + +Consider a workflow with 100 tasks: 5 tasks on `tm1-finance` (10 seconds) and 95 tasks on `tm1-reporting` (30 minutes). + +| Behavior | `tm1-finance` Locked For | +|----------|------------------------| +| **Without** early release | 30 min 10 sec (entire workflow) | +| **With** early release | 10 sec (only while its tasks run) | + +Once the 5 tasks on `tm1-finance` complete, RushTI logs out from that instance immediately, freeing it for other workflows. The session on `tm1-reporting` continues until its tasks finish. + +### What You See in Logs + +``` +Executing task 5/100: RunExtract on tm1-finance +Early session release: logged out from tm1-finance (no remaining tasks) +Executing task 6/100: TransformData on tm1-reporting +... +``` + +### How It Works + +- After each task completes, RushTI checks if any pending or running tasks remain for each connected TM1 instance. +- If an instance has zero remaining tasks, the session is closed immediately. +- In exclusive mode (`--exclusive`), even preserved connections (via `connection_file`) are released early. +- In normal mode, preserved connections are kept open for reuse across runs. + +This feature is always active — no configuration needed. + +--- + ## When to Use Exclusive Mode !!! tip "Use For" diff --git a/src/rushti/cli.py b/src/rushti/cli.py index 1a84cc3..318a4b1 100644 --- a/src/rushti/cli.py +++ b/src/rushti/cli.py @@ -1240,6 +1240,8 @@ def main() -> int: checkpoint_manager=checkpoint_manager, task_optimizer=task_optimizer, stage_workers=stage_workers, + tm1_preserve_connections=preserve_connections, + force_logout=exclusive_mode, ) ) success = True diff --git a/src/rushti/dag.py b/src/rushti/dag.py index d7ab585..641ebdd 100644 --- a/src/rushti/dag.py +++ b/src/rushti/dag.py @@ -303,6 +303,23 @@ def get_execution_results(self) -> Dict[str, bool]: """ return dict(self._results) + def get_remaining_tasks_by_instance(self) -> Dict[str, int]: + """Get count of remaining (non-completed) tasks per TM1 instance. + + A task is considered remaining if its status is PENDING or RUNNING. + + :return: Dictionary mapping instance_name to count of remaining tasks + """ + counts: Dict[str, int] = {} + for task_id, status in self._status.items(): + if status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.SKIPPED): + continue + for task in self._tasks.get(task_id, []): + instance = getattr(task, "instance_name", None) + if instance: + counts[instance] = counts.get(instance, 0) + 1 + return counts + def __len__(self) -> int: """Return the number of unique task IDs in the DAG.""" return len(self._tasks) diff --git a/src/rushti/execution.py b/src/rushti/execution.py index 270d02f..a1d5095 100644 --- a/src/rushti/execution.py +++ b/src/rushti/execution.py @@ -550,6 +550,8 @@ async def work_through_tasks_dag( checkpoint_manager: "CheckpointManager" = None, task_optimizer: "TaskOptimizer" = None, stage_workers: Optional[Dict[str, int]] = None, + tm1_preserve_connections: Optional[Dict] = None, + force_logout: bool = False, ) -> List[bool]: """Execute tasks using DAG-based scheduling. @@ -563,6 +565,11 @@ async def work_through_tasks_dag( in addition to the global max_workers ceiling. The global max_workers always takes precedence as the absolute cap. + When tm1_preserve_connections is provided, early session release is enabled: + after each task completion, instances with no remaining tasks are logged out + immediately to free TM1 server resources and release exclusive locks. + Instances marked as preserved are exempt unless force_logout is True. + :param ctx: The current execution context :param dag: DAG containing tasks and their dependencies :param max_workers: Maximum number of concurrent workers @@ -571,11 +578,17 @@ async def work_through_tasks_dag( :param checkpoint_manager: Optional CheckpointManager for resume support :param task_optimizer: Optional TaskOptimizer for runtime-based scheduling :param stage_workers: Optional per-stage worker limits (e.g. {"extract": 8, "load": 4}) + :param tm1_preserve_connections: Optional dict indicating which connections to preserve. + When provided, enables early session release. Preserved connections are exempt + from early release unless force_logout is True. + :param force_logout: If True, force logout even from preserved connections (exclusive mode) :return: List of execution outcomes (True/False for each task) """ outcomes = [] loop = asyncio.get_event_loop() task_start_times: Dict[str, datetime] = {} + # Track instances that have been early-released to avoid double-logout + released_instances: set = set() with ThreadPoolExecutor(int(max_workers)) as executor: # Map futures to tasks @@ -672,6 +685,23 @@ def submit_ready_tasks(): error_message=None if success else "Task failed", ) + # Early session release: logout from instances with no remaining tasks. + # Only active when tm1_preserve_connections is provided (i.e. called from CLI). + # Direct callers (e.g. integration tests) that don't pass it won't trigger + # early release, preserving shared connection state across test methods. + if tm1_preserve_connections is not None: + remaining = dag.get_remaining_tasks_by_instance() + for instance_name in list(tm1_services.keys()): + if instance_name not in remaining and instance_name not in released_instances: + released = _logout_instance( + instance_name, + tm1_services, + tm1_preserve_connections, + force_logout, + ) + if released: + released_instances.add(instance_name) + # Submit newly ready tasks submit_ready_tasks() @@ -682,6 +712,40 @@ def submit_ready_tasks(): return outcomes +def _logout_instance( + instance_name: str, + tm1_services: Dict, + tm1_preserve_connections: Dict, + force: bool = False, +): + """Logout from a single TM1 instance. + + Used for early session release when an instance has no remaining tasks. + Does NOT remove the instance from tm1_services — the caller tracks + which instances have been released to avoid double-release. + + :param instance_name: Name of the TM1 instance to logout from + :param tm1_services: Dictionary of TM1Service instances + :param tm1_preserve_connections: Dictionary indicating which connections to preserve + :param force: If True, logout even from preserved connections + :return: True if logout was performed, False if skipped + """ + if instance_name not in tm1_services: + return False + + if not force and tm1_preserve_connections.get(instance_name, False): + logger.debug(f"Preserving connection to {instance_name} (early release skipped)") + return False + + try: + tm1_services[instance_name].logout() + logger.info(f"Early session release: logged out from {instance_name} (no remaining tasks)") + return True + except Exception as e: + logger.warning(f"Failed early logout from {instance_name}: {e}") + return False + + def logout( tm1_services: Dict, tm1_preserve_connections: Dict, diff --git a/tests/config.ini.template b/tests/config.ini.template index 4097523..997589b 100644 --- a/tests/config.ini.template +++ b/tests/config.ini.template @@ -10,8 +10,15 @@ # 1. RUSHTI_TEST_CONFIG environment variable (path to a config.ini file) # 2. tests/config.ini (this file, when copied and filled in) # -# For CI/CD (GitHub Actions), set the RUSHTI_TEST_TM1_CONFIG secret to the -# full content of a config.ini file. The CI workflow writes it to tests/config.ini. +# For CI/CD (GitHub Actions), configure either: +# +# Recommended: Variable + Secret (connection details visible, password protected) +# - Variable RUSHTI_TEST_TM1_CONFIG: config.ini content WITHOUT password +# - Secret RUSHTI_TEST_TM1_PASSWORD: the password value only +# The CI workflow merges them into tests/config.ini at runtime. +# +# Legacy: Single secret (entire config opaque) +# - Secret RUSHTI_TEST_TM1_CONFIG: full config.ini content including password # # Multi-Instance Testing: # You can define multiple TM1 instances. Tests that require a specific version diff --git a/tests/unit/test_early_session_release.py b/tests/unit/test_early_session_release.py new file mode 100644 index 0000000..0e157a8 --- /dev/null +++ b/tests/unit/test_early_session_release.py @@ -0,0 +1,383 @@ +"""Unit tests for early session release in DAG execution. + +Tests that TM1 sessions are released as soon as an instance has no remaining +tasks, rather than waiting for the entire workflow to complete. + +See: https://github.com/cubewise-code/rushti/issues/135 +""" + +import asyncio +import os +import sys +import time +import unittest +from unittest.mock import MagicMock, patch + +# Path setup handled by conftest.py, but also support direct execution +_src_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "src" +) +if _src_path not in sys.path: + sys.path.insert(0, _src_path) + +from rushti.dag import DAG # noqa: E402 +from rushti.task import Task # noqa: E402 +from rushti.execution import ( # noqa: E402 + work_through_tasks_dag, + _logout_instance, + ExecutionContext, +) + + +def _make_task(task_id, instance="tm1srv01", process="proc"): + """Helper to create a Task with a specific id and instance.""" + task = Task(instance, process, {}) + task.id = task_id + return task + + +class TestGetRemainingTasksByInstance(unittest.TestCase): + """Tests for DAG.get_remaining_tasks_by_instance().""" + + def test_all_pending(self): + """All tasks pending: all instances should appear in remaining.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + dag.add_task(_make_task("3", instance="A")) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 2, "B": 1}) + + def test_some_completed(self): + """Completed tasks should not appear in remaining counts.""" + dag = DAG() + t1 = _make_task("1", instance="A") + t2 = _make_task("2", instance="A") + t3 = _make_task("3", instance="B") + dag.add_task(t1) + dag.add_task(t2) + dag.add_task(t3) + + dag.mark_running(t1) + dag.mark_complete(t1, True) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 1, "B": 1}) + + def test_instance_fully_completed(self): + """When all tasks for an instance complete, it should not appear.""" + dag = DAG() + t1 = _make_task("1", instance="A") + t2 = _make_task("2", instance="B") + dag.add_task(t1) + dag.add_task(t2) + + dag.mark_running(t1) + dag.mark_complete(t1, True) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertNotIn("A", remaining) + self.assertEqual(remaining, {"B": 1}) + + def test_failed_tasks_not_remaining(self): + """Failed tasks should not count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_running(t1) + dag.mark_complete(t1, False) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {}) + + def test_skipped_tasks_not_remaining(self): + """Skipped tasks should not count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_skipped("1") + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {}) + + def test_running_tasks_still_remaining(self): + """Running tasks should count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_running(t1) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 1}) + + def test_empty_dag(self): + """Empty DAG returns empty dict.""" + dag = DAG() + self.assertEqual(dag.get_remaining_tasks_by_instance(), {}) + + +class TestLogoutInstance(unittest.TestCase): + """Tests for _logout_instance() helper.""" + + def test_logout_calls_logout_and_returns_true(self): + """Successful logout calls logout on the service and returns True.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1, "B": MagicMock()} + + result = _logout_instance("A", services, {}, force=False) + + mock_tm1.logout.assert_called_once() + self.assertTrue(result) + # Dict is NOT modified — caller tracks released instances separately + self.assertIn("A", services) + + def test_preserved_connection_skipped_without_force(self): + """Preserved connections are not logged out without force.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1} + preserve = {"A": True} + + result = _logout_instance("A", services, preserve, force=False) + + mock_tm1.logout.assert_not_called() + self.assertFalse(result) + + def test_preserved_connection_forced(self): + """Force mode logs out even preserved connections.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1} + preserve = {"A": True} + + result = _logout_instance("A", services, preserve, force=True) + + mock_tm1.logout.assert_called_once() + self.assertTrue(result) + + def test_nonexistent_instance_is_noop(self): + """Calling with a non-existent instance returns False.""" + services = {"B": MagicMock()} + + result = _logout_instance("A", services, {}) + + self.assertFalse(result) + self.assertIn("B", services) + + def test_logout_exception_does_not_raise(self): + """Logout failure is caught and logged, returns False.""" + mock_tm1 = MagicMock() + mock_tm1.logout.side_effect = Exception("connection lost") + services = {"A": mock_tm1} + + result = _logout_instance("A", services, {}) + + self.assertFalse(result) + + +class TestEarlySessionRelease(unittest.TestCase): + """Tests for early session release during DAG execution.""" + + def _build_mock_execute_task(self, sleep_by_instance=None): + """Build a mock execute_task that optionally sleeps based on instance. + + :param sleep_by_instance: dict of instance_name -> sleep_seconds + :return: mock function + """ + sleep_by_instance = sleep_by_instance or {} + + def mock_execute_task(ctx, task, retries, tm1_services): + sleep_time = sleep_by_instance.get(task.instance_name, 0.01) + time.sleep(sleep_time) + return True + + return mock_execute_task + + def _run_dag_with_early_release( + self, + dag, + tm1_services, + max_workers=4, + preserve=None, + force_logout=False, + sleep_by_instance=None, + ): + """Run DAG with early session release enabled.""" + preserve = preserve if preserve is not None else {} + mock_fn = self._build_mock_execute_task(sleep_by_instance) + + with patch("rushti.execution.execute_task", mock_fn): + loop = asyncio.new_event_loop() + try: + results = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + max_workers, + 0, + tm1_services, + tm1_preserve_connections=preserve, + force_logout=force_logout, + ) + ) + finally: + loop.close() + return results + + def test_multi_instance_early_release(self): + """Instance A tasks finish first; A is logged out before B tasks complete.""" + dag = DAG() + # 1 fast task on A, 3 slower tasks on B + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + dag.add_task(_make_task("3", instance="B")) + dag.add_task(_make_task("4", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + results = self._run_dag_with_early_release( + dag, + services, + max_workers=4, + sleep_by_instance={"A": 0.01, "B": 0.1}, + ) + + self.assertTrue(all(results)) + # A should have been logged out early + mock_a.logout.assert_called_once() + # Services dict is NOT modified (instances tracked via released_instances set) + self.assertIn("A", services) + + def test_single_instance_early_release(self): + """All tasks on one instance: logout once all tasks complete.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="A")) + + mock_a = MagicMock() + services = {"A": mock_a} + + results = self._run_dag_with_early_release(dag, services, max_workers=4) + + self.assertTrue(all(results)) + # A is logged out once after the last task completes + mock_a.logout.assert_called_once() + + def test_preserved_connection_not_released_early(self): + """Preserved connections are not released early without force.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + preserve = {"A": True} + + results = self._run_dag_with_early_release( + dag, services, preserve=preserve, force_logout=False + ) + + self.assertTrue(all(results)) + # A is preserved, should not be logged out + mock_a.logout.assert_not_called() + self.assertIn("A", services) + # B should be released + mock_b.logout.assert_called_once() + + def test_force_logout_releases_preserved(self): + """Force mode (exclusive) releases even preserved connections.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + preserve = {"A": True} + + results = self._run_dag_with_early_release( + dag, services, preserve=preserve, force_logout=True + ) + + self.assertTrue(all(results)) + # Both should be released even though A is preserved + mock_a.logout.assert_called_once() + mock_b.logout.assert_called_once() + + def test_no_early_release_without_preserve_connections(self): + """No early release when tm1_preserve_connections is not provided. + + This preserves shared connection state for callers (like integration + tests) that reuse tm1_services across multiple executions. + """ + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + mock_fn = self._build_mock_execute_task() + with patch("rushti.execution.execute_task", mock_fn): + loop = asyncio.new_event_loop() + try: + results = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + 4, + 0, + services, + # tm1_preserve_connections not passed (None) — no early release + ) + ) + finally: + loop.close() + + self.assertTrue(all(results)) + # Neither should be logged out (early release not active) + mock_a.logout.assert_not_called() + mock_b.logout.assert_not_called() + + def test_early_release_with_dependencies(self): + """Early release works correctly with DAG dependencies. + + A1 -> B1 -> B2: Instance A should release after A1 completes, + even though B tasks depend on it. + """ + dag = DAG() + t_a1 = _make_task("1", instance="A") + t_b1 = _make_task("2", instance="B") + t_b2 = _make_task("3", instance="B") + dag.add_task(t_a1) + dag.add_task(t_b1) + dag.add_task(t_b2) + dag.add_dependency("2", "1") # B1 depends on A1 + dag.add_dependency("3", "2") # B2 depends on B1 + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + results = self._run_dag_with_early_release(dag, services, max_workers=4) + + self.assertTrue(all(results)) + self.assertEqual(len(results), 3) + # A should have been released early after A1 completed + mock_a.logout.assert_called_once() + + +if __name__ == "__main__": + unittest.main()