Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@
# Unit tests and linting run on every push and PR.
# Integration tests only run on PRs to master (expensive, ~7 min).
#
# Required secrets for integration tests:
# RUSHTI_TEST_TM1_CONFIG - Full content of a config.ini file with TM1 connection details.
# The file format follows the standard RushTI config.ini format.
# See tests/config.ini.template for the expected structure.
# Required configuration for integration tests:
#
# Variable (non-sensitive connection details):
# RUSHTI_TEST_TM1_CONFIG - config.ini content WITHOUT passwords.
# Example:
# [tm1srv01]
# base_url = https://your-server/tm1/api/tm1
# user = your_user
# namespace = LDAP
# ssl = true
# verify = true
# async_requests_mode = true
#
# Secret (sensitive credentials only):
# RUSHTI_TEST_TM1_PASSWORD - The password for tm1srv01.
#
# Legacy (still supported):
# Secret: RUSHTI_TEST_TM1_CONFIG - Full config.ini content including password.
# If the variable + secret pair is configured, it takes precedence over the legacy secret.

name: Tests

Expand Down Expand Up @@ -103,15 +118,25 @@ jobs:
- name: Check TM1 configuration
id: check-tm1
run: |
if [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then
if [ -n "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" ] && [ -n "${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" ]; then
echo "tm1_configured=true" >> $GITHUB_OUTPUT
echo "config_source=variable" >> $GITHUB_OUTPUT
elif [ -n "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" ]; then
echo "tm1_configured=true" >> $GITHUB_OUTPUT
echo "config_source=legacy_secret" >> $GITHUB_OUTPUT
else
echo "tm1_configured=false" >> $GITHUB_OUTPUT
echo "::warning::TM1 config secret not configured. Integration tests will be skipped."
echo "::warning::TM1 config not configured. Integration tests will be skipped."
fi

- name: Create TM1 config file
if: steps.check-tm1.outputs.tm1_configured == 'true'
- name: Create TM1 config file (from variable + secret)
if: steps.check-tm1.outputs.config_source == 'variable'
run: |
echo "${{ vars.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini
echo "password = ${{ secrets.RUSHTI_TEST_TM1_PASSWORD }}" >> tests/config.ini

- name: Create TM1 config file (legacy - full secret)
if: steps.check-tm1.outputs.config_source == 'legacy_secret'
run: |
echo "${{ secrets.RUSHTI_TEST_TM1_CONFIG }}" > tests/config.ini

Expand All @@ -129,9 +154,15 @@ jobs:
- name: Skip message
if: steps.check-tm1.outputs.tm1_configured != 'true'
run: |
echo "Integration tests skipped - TM1 config secret not configured"
echo "To enable, add the RUSHTI_TEST_TM1_CONFIG secret to your repository."
echo "The secret should contain the full config.ini content with TM1 connection details."
echo "Integration tests skipped - TM1 config not configured"
echo ""
echo "To enable (recommended):"
echo " 1. Add variable RUSHTI_TEST_TM1_CONFIG with connection details (no password)"
echo " 2. Add secret RUSHTI_TEST_TM1_PASSWORD with the TM1 password"
echo ""
echo "Legacy (still supported):"
echo " Add secret RUSHTI_TEST_TM1_CONFIG with the full config.ini content"
echo ""
echo "See tests/config.ini.template for the expected format."

lint:
Expand Down
35 changes: 35 additions & 0 deletions docs/features/exclusive-mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,41 @@ timeout = 1800 # Wait up to 30 minutes

---

## Early Session Release

When a workflow spans multiple TM1 instances, RushTI automatically releases sessions from instances that have no remaining tasks — without waiting for the entire workflow to finish. This is especially valuable in exclusive mode, where holding an idle session blocks other RushTI instances from accessing that server.

### Example

Consider a workflow with 100 tasks: 5 tasks on `tm1-finance` (10 seconds) and 95 tasks on `tm1-reporting` (30 minutes).

| Behavior | `tm1-finance` Locked For |
|----------|------------------------|
| **Without** early release | 30 min 10 sec (entire workflow) |
| **With** early release | 10 sec (only while its tasks run) |

Once the 5 tasks on `tm1-finance` complete, RushTI logs out from that instance immediately, freeing it for other workflows. The session on `tm1-reporting` continues until its tasks finish.

### What You See in Logs

```
Executing task 5/100: RunExtract on tm1-finance
Early session release: logged out from tm1-finance (no remaining tasks)
Executing task 6/100: TransformData on tm1-reporting
...
```

### How It Works

- After each task completes, RushTI checks if any pending or running tasks remain for each connected TM1 instance.
- If an instance has zero remaining tasks, the session is closed immediately.
- In exclusive mode (`--exclusive`), even preserved connections (via `connection_file`) are released early.
- In normal mode, preserved connections are kept open for reuse across runs.

This feature is always active — no configuration needed.

---

## When to Use Exclusive Mode

!!! tip "Use For"
Expand Down
2 changes: 2 additions & 0 deletions src/rushti/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,8 @@ def main() -> int:
checkpoint_manager=checkpoint_manager,
task_optimizer=task_optimizer,
stage_workers=stage_workers,
tm1_preserve_connections=preserve_connections,
force_logout=exclusive_mode,
)
)
success = True
Expand Down
17 changes: 17 additions & 0 deletions src/rushti/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,23 @@ def get_execution_results(self) -> Dict[str, bool]:
"""
return dict(self._results)

def get_remaining_tasks_by_instance(self) -> Dict[str, int]:
"""Get count of remaining (non-completed) tasks per TM1 instance.

A task is considered remaining if its status is PENDING or RUNNING.

:return: Dictionary mapping instance_name to count of remaining tasks
"""
counts: Dict[str, int] = {}
for task_id, status in self._status.items():
if status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.SKIPPED):
continue
for task in self._tasks.get(task_id, []):
instance = getattr(task, "instance_name", None)
if instance:
counts[instance] = counts.get(instance, 0) + 1
return counts

def __len__(self) -> int:
"""Return the number of unique task IDs in the DAG."""
return len(self._tasks)
Expand Down
64 changes: 64 additions & 0 deletions src/rushti/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ async def work_through_tasks_dag(
checkpoint_manager: "CheckpointManager" = None,
task_optimizer: "TaskOptimizer" = None,
stage_workers: Optional[Dict[str, int]] = None,
tm1_preserve_connections: Optional[Dict] = None,
force_logout: bool = False,
) -> List[bool]:
"""Execute tasks using DAG-based scheduling.

Expand All @@ -563,6 +565,11 @@ async def work_through_tasks_dag(
in addition to the global max_workers ceiling. The global max_workers
always takes precedence as the absolute cap.

When tm1_preserve_connections is provided, early session release is enabled:
after each task completion, instances with no remaining tasks are logged out
immediately to free TM1 server resources and release exclusive locks.
Instances marked as preserved are exempt unless force_logout is True.

:param ctx: The current execution context
:param dag: DAG containing tasks and their dependencies
:param max_workers: Maximum number of concurrent workers
Expand All @@ -571,11 +578,17 @@ async def work_through_tasks_dag(
:param checkpoint_manager: Optional CheckpointManager for resume support
:param task_optimizer: Optional TaskOptimizer for runtime-based scheduling
:param stage_workers: Optional per-stage worker limits (e.g. {"extract": 8, "load": 4})
:param tm1_preserve_connections: Optional dict indicating which connections to preserve.
When provided, enables early session release. Preserved connections are exempt
from early release unless force_logout is True.
:param force_logout: If True, force logout even from preserved connections (exclusive mode)
:return: List of execution outcomes (True/False for each task)
"""
outcomes = []
loop = asyncio.get_event_loop()
task_start_times: Dict[str, datetime] = {}
# Track instances that have been early-released to avoid double-logout
released_instances: set = set()

with ThreadPoolExecutor(int(max_workers)) as executor:
# Map futures to tasks
Expand Down Expand Up @@ -672,6 +685,23 @@ def submit_ready_tasks():
error_message=None if success else "Task failed",
)

# Early session release: logout from instances with no remaining tasks.
# Only active when tm1_preserve_connections is provided (i.e. called from CLI).
# Direct callers (e.g. integration tests) that don't pass it won't trigger
# early release, preserving shared connection state across test methods.
if tm1_preserve_connections is not None:
remaining = dag.get_remaining_tasks_by_instance()
for instance_name in list(tm1_services.keys()):
if instance_name not in remaining and instance_name not in released_instances:
released = _logout_instance(
instance_name,
tm1_services,
tm1_preserve_connections,
force_logout,
)
if released:
released_instances.add(instance_name)

# Submit newly ready tasks
submit_ready_tasks()

Expand All @@ -682,6 +712,40 @@ def submit_ready_tasks():
return outcomes


def _logout_instance(
instance_name: str,
tm1_services: Dict,
tm1_preserve_connections: Dict,
force: bool = False,
):
"""Logout from a single TM1 instance.

Used for early session release when an instance has no remaining tasks.
Does NOT remove the instance from tm1_services — the caller tracks
which instances have been released to avoid double-release.

:param instance_name: Name of the TM1 instance to logout from
:param tm1_services: Dictionary of TM1Service instances
:param tm1_preserve_connections: Dictionary indicating which connections to preserve
:param force: If True, logout even from preserved connections
:return: True if logout was performed, False if skipped
"""
if instance_name not in tm1_services:
return False

if not force and tm1_preserve_connections.get(instance_name, False):
logger.debug(f"Preserving connection to {instance_name} (early release skipped)")
return False

try:
tm1_services[instance_name].logout()
logger.info(f"Early session release: logged out from {instance_name} (no remaining tasks)")
return True
except Exception as e:
logger.warning(f"Failed early logout from {instance_name}: {e}")
return False


def logout(
tm1_services: Dict,
tm1_preserve_connections: Dict,
Expand Down
11 changes: 9 additions & 2 deletions tests/config.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
# 1. RUSHTI_TEST_CONFIG environment variable (path to a config.ini file)
# 2. tests/config.ini (this file, when copied and filled in)
#
# For CI/CD (GitHub Actions), set the RUSHTI_TEST_TM1_CONFIG secret to the
# full content of a config.ini file. The CI workflow writes it to tests/config.ini.
# For CI/CD (GitHub Actions), configure either:
#
# Recommended: Variable + Secret (connection details visible, password protected)
# - Variable RUSHTI_TEST_TM1_CONFIG: config.ini content WITHOUT password
# - Secret RUSHTI_TEST_TM1_PASSWORD: the password value only
# The CI workflow merges them into tests/config.ini at runtime.
#
# Legacy: Single secret (entire config opaque)
# - Secret RUSHTI_TEST_TM1_CONFIG: full config.ini content including password
#
# Multi-Instance Testing:
# You can define multiple TM1 instances. Tests that require a specific version
Expand Down
Loading
Loading