From 9212a70a7881c9077d71852d3b4a60ad3176ae66 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Thu, 2 Apr 2026 14:48:53 -0300 Subject: [PATCH 1/3] Add early session release for TM1 instances with no remaining tasks When a workflow spans multiple TM1 instances, sessions are now released as soon as an instance has no pending or running tasks, rather than waiting for the entire workflow to complete. This significantly reduces exclusive mode blocking time for multi-instance workflows. Closes #135 Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/features/exclusive-mode.md | 35 +++ src/rushti/cli.py | 2 + src/rushti/dag.py | 17 + src/rushti/execution.py | 51 +++ tests/unit/test_early_session_release.py | 380 +++++++++++++++++++++++ 5 files changed, 485 insertions(+) create mode 100644 tests/unit/test_early_session_release.py diff --git a/docs/features/exclusive-mode.md b/docs/features/exclusive-mode.md index 5724018..9d9a59d 100644 --- a/docs/features/exclusive-mode.md +++ b/docs/features/exclusive-mode.md @@ -181,6 +181,41 @@ timeout = 1800 # Wait up to 30 minutes --- +## Early Session Release + +When a workflow spans multiple TM1 instances, RushTI automatically releases sessions from instances that have no remaining tasks — without waiting for the entire workflow to finish. This is especially valuable in exclusive mode, where holding an idle session blocks other RushTI instances from accessing that server. + +### Example + +Consider a workflow with 100 tasks: 5 tasks on `tm1-finance` (10 seconds) and 95 tasks on `tm1-reporting` (30 minutes). + +| Behavior | `tm1-finance` Locked For | +|----------|------------------------| +| **Without** early release | 30 min 10 sec (entire workflow) | +| **With** early release | 10 sec (only while its tasks run) | + +Once the 5 tasks on `tm1-finance` complete, RushTI logs out from that instance immediately, freeing it for other workflows. The session on `tm1-reporting` continues until its tasks finish. + +### What You See in Logs + +``` +Executing task 5/100: RunExtract on tm1-finance +Early session release: logged out from tm1-finance (no remaining tasks) +Executing task 6/100: TransformData on tm1-reporting +... +``` + +### How It Works + +- After each task completes, RushTI checks if any pending or running tasks remain for each connected TM1 instance. +- If an instance has zero remaining tasks, the session is closed immediately. +- In exclusive mode (`--exclusive`), even preserved connections (via `connection_file`) are released early. +- In normal mode, preserved connections are kept open for reuse across runs. + +This feature is always active — no configuration needed. + +--- + ## When to Use Exclusive Mode !!! tip "Use For" diff --git a/src/rushti/cli.py b/src/rushti/cli.py index 1a84cc3..318a4b1 100644 --- a/src/rushti/cli.py +++ b/src/rushti/cli.py @@ -1240,6 +1240,8 @@ def main() -> int: checkpoint_manager=checkpoint_manager, task_optimizer=task_optimizer, stage_workers=stage_workers, + tm1_preserve_connections=preserve_connections, + force_logout=exclusive_mode, ) ) success = True diff --git a/src/rushti/dag.py b/src/rushti/dag.py index d7ab585..641ebdd 100644 --- a/src/rushti/dag.py +++ b/src/rushti/dag.py @@ -303,6 +303,23 @@ def get_execution_results(self) -> Dict[str, bool]: """ return dict(self._results) + def get_remaining_tasks_by_instance(self) -> Dict[str, int]: + """Get count of remaining (non-completed) tasks per TM1 instance. + + A task is considered remaining if its status is PENDING or RUNNING. + + :return: Dictionary mapping instance_name to count of remaining tasks + """ + counts: Dict[str, int] = {} + for task_id, status in self._status.items(): + if status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.SKIPPED): + continue + for task in self._tasks.get(task_id, []): + instance = getattr(task, "instance_name", None) + if instance: + counts[instance] = counts.get(instance, 0) + 1 + return counts + def __len__(self) -> int: """Return the number of unique task IDs in the DAG.""" return len(self._tasks) diff --git a/src/rushti/execution.py b/src/rushti/execution.py index 270d02f..67585de 100644 --- a/src/rushti/execution.py +++ b/src/rushti/execution.py @@ -550,6 +550,8 @@ async def work_through_tasks_dag( checkpoint_manager: "CheckpointManager" = None, task_optimizer: "TaskOptimizer" = None, stage_workers: Optional[Dict[str, int]] = None, + tm1_preserve_connections: Optional[Dict] = None, + force_logout: bool = False, ) -> List[bool]: """Execute tasks using DAG-based scheduling. @@ -563,6 +565,11 @@ async def work_through_tasks_dag( in addition to the global max_workers ceiling. The global max_workers always takes precedence as the absolute cap. + After each task completion, instances with no remaining tasks are logged out + immediately (early session release) to free TM1 server resources and release + exclusive locks. Instances marked in tm1_preserve_connections are exempt + from early release unless force_logout is True (exclusive mode). + :param ctx: The current execution context :param dag: DAG containing tasks and their dependencies :param max_workers: Maximum number of concurrent workers @@ -571,6 +578,9 @@ async def work_through_tasks_dag( :param checkpoint_manager: Optional CheckpointManager for resume support :param task_optimizer: Optional TaskOptimizer for runtime-based scheduling :param stage_workers: Optional per-stage worker limits (e.g. {"extract": 8, "load": 4}) + :param tm1_preserve_connections: Optional dict indicating which connections to preserve. + Preserved connections are exempt from early release unless force_logout is True. + :param force_logout: If True, force logout even from preserved connections (exclusive mode) :return: List of execution outcomes (True/False for each task) """ outcomes = [] @@ -672,6 +682,17 @@ def submit_ready_tasks(): error_message=None if success else "Task failed", ) + # Early session release: logout from instances with no remaining tasks + remaining = dag.get_remaining_tasks_by_instance() + for instance_name in list(tm1_services.keys()): + if instance_name not in remaining: + _logout_instance( + instance_name, + tm1_services, + tm1_preserve_connections or {}, + force_logout, + ) + # Submit newly ready tasks submit_ready_tasks() @@ -682,6 +703,36 @@ def submit_ready_tasks(): return outcomes +def _logout_instance( + instance_name: str, + tm1_services: Dict, + tm1_preserve_connections: Dict, + force: bool = False, +): + """Logout from a single TM1 instance and remove it from the services dict. + + Used for early session release when an instance has no remaining tasks. + + :param instance_name: Name of the TM1 instance to logout from + :param tm1_services: Dictionary of TM1Service instances (modified in-place) + :param tm1_preserve_connections: Dictionary indicating which connections to preserve + :param force: If True, logout even from preserved connections + """ + if instance_name not in tm1_services: + return + + if not force and tm1_preserve_connections.get(instance_name, False): + logger.debug(f"Preserving connection to {instance_name} (early release skipped)") + return + + try: + tm1_services[instance_name].logout() + del tm1_services[instance_name] + logger.info(f"Early session release: logged out from {instance_name} (no remaining tasks)") + except Exception as e: + logger.warning(f"Failed early logout from {instance_name}: {e}") + + def logout( tm1_services: Dict, tm1_preserve_connections: Dict, diff --git a/tests/unit/test_early_session_release.py b/tests/unit/test_early_session_release.py new file mode 100644 index 0000000..45c8aea --- /dev/null +++ b/tests/unit/test_early_session_release.py @@ -0,0 +1,380 @@ +"""Unit tests for early session release in DAG execution. + +Tests that TM1 sessions are released as soon as an instance has no remaining +tasks, rather than waiting for the entire workflow to complete. + +See: https://github.com/cubewise-code/rushti/issues/135 +""" + +import asyncio +import os +import sys +import time +import unittest +from unittest.mock import MagicMock, patch + +# Path setup handled by conftest.py, but also support direct execution +_src_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "src" +) +if _src_path not in sys.path: + sys.path.insert(0, _src_path) + +from rushti.dag import DAG # noqa: E402 +from rushti.task import Task # noqa: E402 +from rushti.execution import ( # noqa: E402 + work_through_tasks_dag, + _logout_instance, + ExecutionContext, +) + + +def _make_task(task_id, instance="tm1srv01", process="proc"): + """Helper to create a Task with a specific id and instance.""" + task = Task(instance, process, {}) + task.id = task_id + return task + + +class TestGetRemainingTasksByInstance(unittest.TestCase): + """Tests for DAG.get_remaining_tasks_by_instance().""" + + def test_all_pending(self): + """All tasks pending: all instances should appear in remaining.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + dag.add_task(_make_task("3", instance="A")) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 2, "B": 1}) + + def test_some_completed(self): + """Completed tasks should not appear in remaining counts.""" + dag = DAG() + t1 = _make_task("1", instance="A") + t2 = _make_task("2", instance="A") + t3 = _make_task("3", instance="B") + dag.add_task(t1) + dag.add_task(t2) + dag.add_task(t3) + + dag.mark_running(t1) + dag.mark_complete(t1, True) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 1, "B": 1}) + + def test_instance_fully_completed(self): + """When all tasks for an instance complete, it should not appear.""" + dag = DAG() + t1 = _make_task("1", instance="A") + t2 = _make_task("2", instance="B") + dag.add_task(t1) + dag.add_task(t2) + + dag.mark_running(t1) + dag.mark_complete(t1, True) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertNotIn("A", remaining) + self.assertEqual(remaining, {"B": 1}) + + def test_failed_tasks_not_remaining(self): + """Failed tasks should not count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_running(t1) + dag.mark_complete(t1, False) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {}) + + def test_skipped_tasks_not_remaining(self): + """Skipped tasks should not count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_skipped("1") + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {}) + + def test_running_tasks_still_remaining(self): + """Running tasks should count as remaining.""" + dag = DAG() + t1 = _make_task("1", instance="A") + dag.add_task(t1) + + dag.mark_running(t1) + + remaining = dag.get_remaining_tasks_by_instance() + + self.assertEqual(remaining, {"A": 1}) + + def test_empty_dag(self): + """Empty DAG returns empty dict.""" + dag = DAG() + self.assertEqual(dag.get_remaining_tasks_by_instance(), {}) + + +class TestLogoutInstance(unittest.TestCase): + """Tests for _logout_instance() helper.""" + + def test_logout_removes_from_services(self): + """Successful logout removes instance from tm1_services dict.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1, "B": MagicMock()} + + _logout_instance("A", services, {}, force=False) + + mock_tm1.logout.assert_called_once() + self.assertNotIn("A", services) + self.assertIn("B", services) + + def test_preserved_connection_skipped_without_force(self): + """Preserved connections are not logged out without force.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1} + preserve = {"A": True} + + _logout_instance("A", services, preserve, force=False) + + mock_tm1.logout.assert_not_called() + self.assertIn("A", services) + + def test_preserved_connection_forced(self): + """Force mode logs out even preserved connections.""" + mock_tm1 = MagicMock() + services = {"A": mock_tm1} + preserve = {"A": True} + + _logout_instance("A", services, preserve, force=True) + + mock_tm1.logout.assert_called_once() + self.assertNotIn("A", services) + + def test_nonexistent_instance_is_noop(self): + """Calling with a non-existent instance does nothing.""" + services = {"B": MagicMock()} + + _logout_instance("A", services, {}) + + self.assertIn("B", services) + + def test_logout_exception_does_not_raise(self): + """Logout failure is caught and logged, not raised.""" + mock_tm1 = MagicMock() + mock_tm1.logout.side_effect = Exception("connection lost") + services = {"A": mock_tm1} + + # Should not raise + _logout_instance("A", services, {}) + + +class TestEarlySessionRelease(unittest.TestCase): + """Tests for early session release during DAG execution.""" + + def _build_mock_execute_task(self, sleep_by_instance=None): + """Build a mock execute_task that optionally sleeps based on instance. + + :param sleep_by_instance: dict of instance_name -> sleep_seconds + :return: mock function + """ + sleep_by_instance = sleep_by_instance or {} + + def mock_execute_task(ctx, task, retries, tm1_services): + sleep_time = sleep_by_instance.get(task.instance_name, 0.01) + time.sleep(sleep_time) + return True + + return mock_execute_task + + def _run_dag_with_early_release( + self, + dag, + tm1_services, + max_workers=4, + preserve=None, + force_logout=False, + sleep_by_instance=None, + ): + """Run DAG with early session release enabled.""" + preserve = preserve if preserve is not None else {} + mock_fn = self._build_mock_execute_task(sleep_by_instance) + + with patch("rushti.execution.execute_task", mock_fn): + loop = asyncio.new_event_loop() + try: + results = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + max_workers, + 0, + tm1_services, + tm1_preserve_connections=preserve, + force_logout=force_logout, + ) + ) + finally: + loop.close() + return results + + def test_multi_instance_early_release(self): + """Instance A tasks finish first; A is logged out before B tasks complete.""" + dag = DAG() + # 1 fast task on A, 3 slower tasks on B + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + dag.add_task(_make_task("3", instance="B")) + dag.add_task(_make_task("4", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + results = self._run_dag_with_early_release( + dag, + services, + max_workers=4, + sleep_by_instance={"A": 0.01, "B": 0.1}, + ) + + self.assertTrue(all(results)) + # A should have been logged out early + mock_a.logout.assert_called_once() + # A should have been removed from services dict + self.assertNotIn("A", services) + + def test_single_instance_no_early_release(self): + """All tasks on one instance: no early logout (final logout handles it).""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="A")) + + mock_a = MagicMock() + services = {"A": mock_a} + + results = self._run_dag_with_early_release(dag, services, max_workers=4) + + self.assertTrue(all(results)) + # A is logged out on the last task completion (early release) + mock_a.logout.assert_called_once() + self.assertNotIn("A", services) + + def test_preserved_connection_not_released_early(self): + """Preserved connections are not released early without force.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + preserve = {"A": True} + + results = self._run_dag_with_early_release( + dag, services, preserve=preserve, force_logout=False + ) + + self.assertTrue(all(results)) + # A is preserved, should not be logged out + mock_a.logout.assert_not_called() + self.assertIn("A", services) + # B should be released + mock_b.logout.assert_called_once() + + def test_force_logout_releases_preserved(self): + """Force mode (exclusive) releases even preserved connections.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + preserve = {"A": True} + + results = self._run_dag_with_early_release( + dag, services, preserve=preserve, force_logout=True + ) + + self.assertTrue(all(results)) + # Both should be released even though A is preserved + mock_a.logout.assert_called_once() + mock_b.logout.assert_called_once() + + def test_early_release_without_preserve_connections(self): + """Early release happens by default even without tm1_preserve_connections.""" + dag = DAG() + dag.add_task(_make_task("1", instance="A")) + dag.add_task(_make_task("2", instance="B")) + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + mock_fn = self._build_mock_execute_task() + with patch("rushti.execution.execute_task", mock_fn): + loop = asyncio.new_event_loop() + try: + results = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + 4, + 0, + services, + # tm1_preserve_connections not passed (None) — early release still active + ) + ) + finally: + loop.close() + + self.assertTrue(all(results)) + # Both should be released (early release is always active) + mock_a.logout.assert_called_once() + mock_b.logout.assert_called_once() + self.assertNotIn("A", services) + self.assertNotIn("B", services) + + def test_early_release_with_dependencies(self): + """Early release works correctly with DAG dependencies. + + A1 -> B1 -> B2: Instance A should release after A1 completes, + even though B tasks depend on it. + """ + dag = DAG() + t_a1 = _make_task("1", instance="A") + t_b1 = _make_task("2", instance="B") + t_b2 = _make_task("3", instance="B") + dag.add_task(t_a1) + dag.add_task(t_b1) + dag.add_task(t_b2) + dag.add_dependency("2", "1") # B1 depends on A1 + dag.add_dependency("3", "2") # B2 depends on B1 + + mock_a = MagicMock() + mock_b = MagicMock() + services = {"A": mock_a, "B": mock_b} + + results = self._run_dag_with_early_release(dag, services, max_workers=4) + + self.assertTrue(all(results)) + self.assertEqual(len(results), 3) + # A should have been released early after A1 completed + mock_a.logout.assert_called_once() + self.assertNotIn("A", services) + + +if __name__ == "__main__": + unittest.main() From b8993a1184e3b52dc15e9195a2e9efadf0be9021 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Thu, 2 Apr 2026 15:14:36 -0300 Subject: [PATCH 2/3] Fix integration tests: don't mutate tm1_services dict in early release Early release was deleting entries from tm1_services, which broke integration tests that share the dict across test methods via setUpClass. Now tracks released instances in a local set instead. Also splits CI TM1 config into variable (connection details) + secret (password only) for better visibility and debugging. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/tests.yml | 53 +++++++++++++++++++----- src/rushti/execution.py | 22 ++++++---- tests/config.ini.template | 11 ++++- tests/unit/test_early_session_release.py | 45 ++++++++++---------- 4 files changed, 88 insertions(+), 43 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 38e6242..b8df830 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -6,10 +6,25 @@ # Unit tests and linting run on every push and PR. # Integration tests only run on PRs to master (expensive, ~7 min). # -# Required secrets for integration tests: -# RUSHTI_TEST_TM1_CONFIG - Full content of a config.ini file with TM1 connection details. -# The file format follows the standard RushTI config.ini format. -# See tests/config.ini.template for the expected structure. +# Required configuration for integration tests: +# +# Variable (non-sensitive connection details): +# RUSHTI_TEST_TM1_CONFIG - config.ini content WITHOUT passwords. +# Example: +# [tm1srv01] +# base_url = https://your-server/tm1/api/tm1 +# user = your_user +# namespace = LDAP +# ssl = true +# verify = true +# async_requests_mode = true +# +# Secret (sensitive credentials only): +# RUSHTI_TEST_TM1_PASSWORD - The password for tm1srv01. +# +# Legacy (still supported): +# Secret: RUSHTI_TEST_TM1_CONFIG - Full config.ini content including password. +# If the variable + secret pair is configured, it takes precedence over the legacy secret. name: Tests @@ -103,15 +118,25 @@ jobs: - name: Check TM1 configuration id: check-tm1 run: | - if [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then + if [ -n "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" ] && [ -n "${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" ]; then + echo "tm1_configured=true" >> $GITHUB_OUTPUT + echo "config_source=variable" >> $GITHUB_OUTPUT + elif [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then echo "tm1_configured=true" >> $GITHUB_OUTPUT + echo "config_source=legacy_secret" >> $GITHUB_OUTPUT else echo "tm1_configured=false" >> $GITHUB_OUTPUT - echo "::warning::TM1 config secret not configured. Integration tests will be skipped." + echo "::warning::TM1 config not configured. Integration tests will be skipped." fi - - name: Create TM1 config file - if: steps.check-tm1.outputs.tm1_configured == 'true' + - name: Create TM1 config file (from variable + secret) + if: steps.check-tm1.outputs.config_source == 'variable' + run: | + echo "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini + echo "password = ${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" >> tests/config.ini + + - name: Create TM1 config file (legacy - full secret) + if: steps.check-tm1.outputs.config_source == 'legacy_secret' run: | echo "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini @@ -129,9 +154,15 @@ jobs: - name: Skip message if: steps.check-tm1.outputs.tm1_configured != 'true' run: | - echo "Integration tests skipped - TM1 config secret not configured" - echo "To enable, add the RUSHTI_TEST_TM1_CONFIG secret to your repository." - echo "The secret should contain the full config.ini content with TM1 connection details." + echo "Integration tests skipped - TM1 config not configured" + echo "" + echo "To enable (recommended):" + echo " 1. Add variable RUSHTI_TEST_TM1_CONFIG with connection details (no password)" + echo " 2. Add secret RUSHTI_TEST_TM1_PASSWORD with the TM1 password" + echo "" + echo "Legacy (still supported):" + echo " Add secret RUSHTI_TEST_TM1_CONFIG with the full config.ini content" + echo "" echo "See tests/config.ini.template for the expected format." lint: diff --git a/src/rushti/execution.py b/src/rushti/execution.py index 67585de..b95a637 100644 --- a/src/rushti/execution.py +++ b/src/rushti/execution.py @@ -586,6 +586,8 @@ async def work_through_tasks_dag( outcomes = [] loop = asyncio.get_event_loop() task_start_times: Dict[str, datetime] = {} + # Track instances that have been early-released to avoid double-logout + released_instances: set = set() with ThreadPoolExecutor(int(max_workers)) as executor: # Map futures to tasks @@ -685,13 +687,15 @@ def submit_ready_tasks(): # Early session release: logout from instances with no remaining tasks remaining = dag.get_remaining_tasks_by_instance() for instance_name in list(tm1_services.keys()): - if instance_name not in remaining: - _logout_instance( + if instance_name not in remaining and instance_name not in released_instances: + released = _logout_instance( instance_name, tm1_services, tm1_preserve_connections or {}, force_logout, ) + if released: + released_instances.add(instance_name) # Submit newly ready tasks submit_ready_tasks() @@ -709,28 +713,32 @@ def _logout_instance( tm1_preserve_connections: Dict, force: bool = False, ): - """Logout from a single TM1 instance and remove it from the services dict. + """Logout from a single TM1 instance. Used for early session release when an instance has no remaining tasks. + Does NOT remove the instance from tm1_services — the caller tracks + which instances have been released to avoid double-release. :param instance_name: Name of the TM1 instance to logout from - :param tm1_services: Dictionary of TM1Service instances (modified in-place) + :param tm1_services: Dictionary of TM1Service instances :param tm1_preserve_connections: Dictionary indicating which connections to preserve :param force: If True, logout even from preserved connections + :return: True if logout was performed, False if skipped """ if instance_name not in tm1_services: - return + return False if not force and tm1_preserve_connections.get(instance_name, False): logger.debug(f"Preserving connection to {instance_name} (early release skipped)") - return + return False try: tm1_services[instance_name].logout() - del tm1_services[instance_name] logger.info(f"Early session release: logged out from {instance_name} (no remaining tasks)") + return True except Exception as e: logger.warning(f"Failed early logout from {instance_name}: {e}") + return False def logout( diff --git a/tests/config.ini.template b/tests/config.ini.template index 4097523..997589b 100644 --- a/tests/config.ini.template +++ b/tests/config.ini.template @@ -10,8 +10,15 @@ # 1. RUSHTI_TEST_CONFIG environment variable (path to a config.ini file) # 2. tests/config.ini (this file, when copied and filled in) # -# For CI/CD (GitHub Actions), set the RUSHTI_TEST_TM1_CONFIG secret to the -# full content of a config.ini file. The CI workflow writes it to tests/config.ini. +# For CI/CD (GitHub Actions), configure either: +# +# Recommended: Variable + Secret (connection details visible, password protected) +# - Variable RUSHTI_TEST_TM1_CONFIG: config.ini content WITHOUT password +# - Secret RUSHTI_TEST_TM1_PASSWORD: the password value only +# The CI workflow merges them into tests/config.ini at runtime. +# +# Legacy: Single secret (entire config opaque) +# - Secret RUSHTI_TEST_TM1_CONFIG: full config.ini content including password # # Multi-Instance Testing: # You can define multiple TM1 instances. Tests that require a specific version diff --git a/tests/unit/test_early_session_release.py b/tests/unit/test_early_session_release.py index 45c8aea..9bfde9a 100644 --- a/tests/unit/test_early_session_release.py +++ b/tests/unit/test_early_session_release.py @@ -129,16 +129,17 @@ def test_empty_dag(self): class TestLogoutInstance(unittest.TestCase): """Tests for _logout_instance() helper.""" - def test_logout_removes_from_services(self): - """Successful logout removes instance from tm1_services dict.""" + def test_logout_calls_logout_and_returns_true(self): + """Successful logout calls logout on the service and returns True.""" mock_tm1 = MagicMock() services = {"A": mock_tm1, "B": MagicMock()} - _logout_instance("A", services, {}, force=False) + result = _logout_instance("A", services, {}, force=False) mock_tm1.logout.assert_called_once() - self.assertNotIn("A", services) - self.assertIn("B", services) + self.assertTrue(result) + # Dict is NOT modified — caller tracks released instances separately + self.assertIn("A", services) def test_preserved_connection_skipped_without_force(self): """Preserved connections are not logged out without force.""" @@ -146,10 +147,10 @@ def test_preserved_connection_skipped_without_force(self): services = {"A": mock_tm1} preserve = {"A": True} - _logout_instance("A", services, preserve, force=False) + result = _logout_instance("A", services, preserve, force=False) mock_tm1.logout.assert_not_called() - self.assertIn("A", services) + self.assertFalse(result) def test_preserved_connection_forced(self): """Force mode logs out even preserved connections.""" @@ -157,27 +158,29 @@ def test_preserved_connection_forced(self): services = {"A": mock_tm1} preserve = {"A": True} - _logout_instance("A", services, preserve, force=True) + result = _logout_instance("A", services, preserve, force=True) mock_tm1.logout.assert_called_once() - self.assertNotIn("A", services) + self.assertTrue(result) def test_nonexistent_instance_is_noop(self): - """Calling with a non-existent instance does nothing.""" + """Calling with a non-existent instance returns False.""" services = {"B": MagicMock()} - _logout_instance("A", services, {}) + result = _logout_instance("A", services, {}) + self.assertFalse(result) self.assertIn("B", services) def test_logout_exception_does_not_raise(self): - """Logout failure is caught and logged, not raised.""" + """Logout failure is caught and logged, returns False.""" mock_tm1 = MagicMock() mock_tm1.logout.side_effect = Exception("connection lost") services = {"A": mock_tm1} - # Should not raise - _logout_instance("A", services, {}) + result = _logout_instance("A", services, {}) + + self.assertFalse(result) class TestEarlySessionRelease(unittest.TestCase): @@ -252,11 +255,11 @@ def test_multi_instance_early_release(self): self.assertTrue(all(results)) # A should have been logged out early mock_a.logout.assert_called_once() - # A should have been removed from services dict - self.assertNotIn("A", services) + # Services dict is NOT modified (instances tracked via released_instances set) + self.assertIn("A", services) - def test_single_instance_no_early_release(self): - """All tasks on one instance: no early logout (final logout handles it).""" + def test_single_instance_early_release(self): + """All tasks on one instance: logout once all tasks complete.""" dag = DAG() dag.add_task(_make_task("1", instance="A")) dag.add_task(_make_task("2", instance="A")) @@ -267,9 +270,8 @@ def test_single_instance_no_early_release(self): results = self._run_dag_with_early_release(dag, services, max_workers=4) self.assertTrue(all(results)) - # A is logged out on the last task completion (early release) + # A is logged out once after the last task completes mock_a.logout.assert_called_once() - self.assertNotIn("A", services) def test_preserved_connection_not_released_early(self): """Preserved connections are not released early without force.""" @@ -344,8 +346,6 @@ def test_early_release_without_preserve_connections(self): # Both should be released (early release is always active) mock_a.logout.assert_called_once() mock_b.logout.assert_called_once() - self.assertNotIn("A", services) - self.assertNotIn("B", services) def test_early_release_with_dependencies(self): """Early release works correctly with DAG dependencies. @@ -373,7 +373,6 @@ def test_early_release_with_dependencies(self): self.assertEqual(len(results), 3) # A should have been released early after A1 completed mock_a.logout.assert_called_once() - self.assertNotIn("A", services) if __name__ == "__main__": From b4567a97fe5aeb7f5c9b4131de3c0e0007f88009 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Thu, 2 Apr 2026 15:25:32 -0300 Subject: [PATCH 3/3] Fix early release: gate on tm1_preserve_connections to protect shared connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Early release must only fire when tm1_preserve_connections is explicitly passed (i.e. from CLI). Integration tests share TM1Service objects across test methods via setUpClass — logging out a shared service corrupted the HTTP session state ("multiple cookies with name TM1SessionId"). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/rushti/execution.py | 39 +++++++++++++----------- tests/unit/test_early_session_release.py | 16 ++++++---- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/rushti/execution.py b/src/rushti/execution.py index b95a637..a1d5095 100644 --- a/src/rushti/execution.py +++ b/src/rushti/execution.py @@ -565,10 +565,10 @@ async def work_through_tasks_dag( in addition to the global max_workers ceiling. The global max_workers always takes precedence as the absolute cap. - After each task completion, instances with no remaining tasks are logged out - immediately (early session release) to free TM1 server resources and release - exclusive locks. Instances marked in tm1_preserve_connections are exempt - from early release unless force_logout is True (exclusive mode). + When tm1_preserve_connections is provided, early session release is enabled: + after each task completion, instances with no remaining tasks are logged out + immediately to free TM1 server resources and release exclusive locks. + Instances marked as preserved are exempt unless force_logout is True. :param ctx: The current execution context :param dag: DAG containing tasks and their dependencies @@ -579,7 +579,8 @@ async def work_through_tasks_dag( :param task_optimizer: Optional TaskOptimizer for runtime-based scheduling :param stage_workers: Optional per-stage worker limits (e.g. {"extract": 8, "load": 4}) :param tm1_preserve_connections: Optional dict indicating which connections to preserve. - Preserved connections are exempt from early release unless force_logout is True. + When provided, enables early session release. Preserved connections are exempt + from early release unless force_logout is True. :param force_logout: If True, force logout even from preserved connections (exclusive mode) :return: List of execution outcomes (True/False for each task) """ @@ -684,18 +685,22 @@ def submit_ready_tasks(): error_message=None if success else "Task failed", ) - # Early session release: logout from instances with no remaining tasks - remaining = dag.get_remaining_tasks_by_instance() - for instance_name in list(tm1_services.keys()): - if instance_name not in remaining and instance_name not in released_instances: - released = _logout_instance( - instance_name, - tm1_services, - tm1_preserve_connections or {}, - force_logout, - ) - if released: - released_instances.add(instance_name) + # Early session release: logout from instances with no remaining tasks. + # Only active when tm1_preserve_connections is provided (i.e. called from CLI). + # Direct callers (e.g. integration tests) that don't pass it won't trigger + # early release, preserving shared connection state across test methods. + if tm1_preserve_connections is not None: + remaining = dag.get_remaining_tasks_by_instance() + for instance_name in list(tm1_services.keys()): + if instance_name not in remaining and instance_name not in released_instances: + released = _logout_instance( + instance_name, + tm1_services, + tm1_preserve_connections, + force_logout, + ) + if released: + released_instances.add(instance_name) # Submit newly ready tasks submit_ready_tasks() diff --git a/tests/unit/test_early_session_release.py b/tests/unit/test_early_session_release.py index 9bfde9a..0e157a8 100644 --- a/tests/unit/test_early_session_release.py +++ b/tests/unit/test_early_session_release.py @@ -315,8 +315,12 @@ def test_force_logout_releases_preserved(self): mock_a.logout.assert_called_once() mock_b.logout.assert_called_once() - def test_early_release_without_preserve_connections(self): - """Early release happens by default even without tm1_preserve_connections.""" + def test_no_early_release_without_preserve_connections(self): + """No early release when tm1_preserve_connections is not provided. + + This preserves shared connection state for callers (like integration + tests) that reuse tm1_services across multiple executions. + """ dag = DAG() dag.add_task(_make_task("1", instance="A")) dag.add_task(_make_task("2", instance="B")) @@ -336,16 +340,16 @@ def test_early_release_without_preserve_connections(self): 4, 0, services, - # tm1_preserve_connections not passed (None) — early release still active + # tm1_preserve_connections not passed (None) — no early release ) ) finally: loop.close() self.assertTrue(all(results)) - # Both should be released (early release is always active) - mock_a.logout.assert_called_once() - mock_b.logout.assert_called_once() + # Neither should be logged out (early release not active) + mock_a.logout.assert_not_called() + mock_b.logout.assert_not_called() def test_early_release_with_dependencies(self): """Early release works correctly with DAG dependencies.