From e0b92ae578b83bfb7deddf7dc4eaa93e65a91a46 Mon Sep 17 00:00:00 2001 From: rafsanneloy Date: Wed, 4 Mar 2026 19:41:08 +0600 Subject: [PATCH 1/5] implementation of multi-session support and non-interactive login Signed-off-by: rafsanneloy --- cbrain_cli/cli_utils.py | 53 +++++- cbrain_cli/config.py | 4 +- cbrain_cli/data/projects.py | 111 +++++------ cbrain_cli/main.py | 356 ++++++++++++++++++++++++++++-------- cbrain_cli/sessions.py | 130 ++++++++----- 5 files changed, 456 insertions(+), 198 deletions(-) diff --git a/cbrain_cli/cli_utils.py b/cbrain_cli/cli_utils.py index cc8e7a6..c3a2633 100644 --- a/cbrain_cli/cli_utils.py +++ b/cbrain_cli/cli_utils.py @@ -1,22 +1,58 @@ import functools import json import re +import sys import urllib.error +from pathlib import Path # import importlib.metadata from cbrain_cli.config import CREDENTIALS_FILE +# Parse session name from arguments +session_name = "default" +session_specified = False +for i, arg in enumerate(sys.argv): + if arg == "--session" and i + 1 < len(sys.argv): + session_name = sys.argv[i + 1] + session_specified = True + elif arg.startswith("--session="): + session_name = arg.split("=", 1)[1] + session_specified = True + try: # MARK: Credentials. - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) + try: + with open(CREDENTIALS_FILE) as f: + all_credentials = json.load(f) + except FileNotFoundError: + # Fallback to older format location + old_file = Path.home() / ".config" / "cbrain" / "credentials.json" + if old_file.exists(): + with open(old_file) as f: + old_creds = json.load(f) + if "cbrain_url" in old_creds: + all_credentials = {"default": old_creds} + else: + all_credentials = old_creds + else: + all_credentials = {} + + if session_name in all_credentials: + credentials = all_credentials[session_name] + elif "cbrain_url" in all_credentials: + # Old format file at current location + credentials = all_credentials if session_name == "default" else {} + all_credentials = {"default": all_credentials} + else: + credentials = {} # Get credentials. cbrain_url = credentials.get("cbrain_url") api_token = credentials.get("api_token") user_id = credentials.get("user_id") cbrain_timestamp = credentials.get("timestamp") -except FileNotFoundError: +except Exception: + all_credentials = {} cbrain_url = None api_token = None user_id = None @@ -84,8 +120,8 @@ def handle_connection_error(error): if error.code == 401: print(f"{status_description}: {error.reason}") - print("Error: Access denied. Please log in using authorized credentials.") - elif error.code == 404 or error.code == 422 or error.code == 500: + print("Try with Authorized Access") + elif error.code in (400, 404, 422, 500): # Try to extract specific error message from response try: # Check if the error response has already been read @@ -107,6 +143,13 @@ def handle_connection_error(error): or error_data.get("notice") or str(error_data) ) + # Check if this looks like a password change redirect via API + if "change_password" in error_msg: + print( + f"{status_description}: Account requires a password change. " + "Please log into the web portal." + ) + return print(f"{status_description}: {error_msg}") return except json.JSONDecodeError: diff --git a/cbrain_cli/config.py b/cbrain_cli/config.py index cc74b5b..e5339bc 100644 --- a/cbrain_cli/config.py +++ b/cbrain_cli/config.py @@ -8,8 +8,8 @@ DEFAULT_BASE_URL = "http://localhost:3000" # Session file configuration. -SESSION_FILE_DIR = Path.home() / ".config" / "cbrain" -SESSION_FILE_NAME = "credentials.json" +SESSION_FILE_DIR = Path.home() / ".config" +SESSION_FILE_NAME = "cbrain.json" SESSION_FILE_DIR.mkdir(parents=True, exist_ok=True) CREDENTIALS_FILE = SESSION_FILE_DIR / SESSION_FILE_NAME diff --git a/cbrain_cli/data/projects.py b/cbrain_cli/data/projects.py index 58ff27b..0aaea61 100644 --- a/cbrain_cli/data/projects.py +++ b/cbrain_cli/data/projects.py @@ -26,18 +26,6 @@ def switch_project(args): print("Error: Group ID is required") return None - # Handle the special case of "all" - if group_id == "all": - print("Project switch 'all' not yet implemented as of Aug 2025") - return None - - # Convert to integer for regular group IDs - try: - group_id = int(group_id) - except ValueError: - print(f"Error: Invalid group ID '{group_id}'. Must be a number or 'all'") - return None - # Step 1: Call the switch API switch_endpoint = f"{cbrain_url}/groups/switch?id={group_id}" headers = auth_headers(api_token) @@ -55,86 +43,73 @@ def switch_project(args): group_data_text = group_response.read().decode("utf-8") group_data = json.loads(group_data_text) - # Step 3: Update credentials file with current group_id - if CREDENTIALS_FILE.exists(): - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) + # Step 3: Update credentials file with current group_id + from cbrain_cli.cli_utils import session_name + + if CREDENTIALS_FILE.exists(): + with open(CREDENTIALS_FILE) as f: + all_credentials_file = json.load(f) - credentials["current_group_id"] = group_id - credentials["current_group_name"] = group_data.get("name", "Unknown") + if session_name in all_credentials_file: + all_credentials_file[session_name]["current_group_id"] = group_id + all_credentials_file[session_name]["current_group_name"] = group_data.get( + "name", "Unknown" + ) with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) + json.dump(all_credentials_file, f, indent=2) - return group_data + return group_data def show_project(args): """ - Get the current project/group from credentials or show a specific project by ID. + Get the current project/group from credentials. Parameters ---------- args : argparse.Namespace - Command line arguments, may include project_id + Command line arguments Returns ------- dict or None Dictionary containing project details if successful, None if no project set """ - # Check if a specific project ID was provided - project_id = getattr(args, "project_id", None) - - if project_id: - # Show specific project by ID - group_endpoint = f"{cbrain_url}/groups/{project_id}" - headers = auth_headers(api_token) - request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") - - try: - with urllib.request.urlopen(request) as response: - data = response.read().decode("utf-8") - group_data = json.loads(data) - return group_data - except urllib.error.HTTPError as e: - if e.code == 404: - print(f"Error: Project with ID {project_id} not found") - return None - else: - raise - else: - # Show current project from credentials - with open(CREDENTIALS_FILE) as f: - credentials = json.load(f) + from cbrain_cli.cli_utils import session_name - current_group_id = credentials.get("current_group_id") - if not current_group_id: - return None + with open(CREDENTIALS_FILE) as f: + all_credentials_file = json.load(f) - # Get fresh group details from server - group_endpoint = f"{cbrain_url}/groups/{current_group_id}" - headers = auth_headers(api_token) + credentials = all_credentials_file.get(session_name, {}) + current_group_id = credentials.get("current_group_id") + if not current_group_id: + return None - request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") + # Get fresh group details from server + group_endpoint = f"{cbrain_url}/groups/{current_group_id}" + headers = auth_headers(api_token) - try: - with urllib.request.urlopen(request) as response: - data = response.read().decode("utf-8") - group_data = json.loads(data) - return group_data + request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") - except urllib.error.HTTPError as e: - if e.code == 404: - print(f"Error: Current project (ID {current_group_id}) no longer exists") - # Clear the invalid group_id from credentials - credentials.pop("current_group_id", None) - credentials.pop("current_group_name", None) + try: + with urllib.request.urlopen(request) as response: + data = response.read().decode("utf-8") + group_data = json.loads(data) + return group_data + + except urllib.error.HTTPError as e: + if e.code == 404: + print(f"Error: Current project (ID {current_group_id}) no longer exists") + # Clear the invalid group_id from credentials + if session_name in all_credentials_file: + all_credentials_file[session_name].pop("current_group_id", None) + all_credentials_file[session_name].pop("current_group_name", None) with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) - return None - else: - raise + json.dump(all_credentials_file, f, indent=2) + return None + else: + raise def list_projects(args): diff --git a/cbrain_cli/main.py b/cbrain_cli/main.py index 41e2d07..980191c 100644 --- a/cbrain_cli/main.py +++ b/cbrain_cli/main.py @@ -5,40 +5,64 @@ import argparse import sys -from cbrain_cli.cli_utils import handle_errors, is_authenticated, version_info -from cbrain_cli.data.tasks import operation_task -from cbrain_cli.handlers import ( - handle_background_list, - handle_background_show, - handle_dataprovider_delete_unregistered, - handle_dataprovider_is_alive, - handle_dataprovider_list, - handle_dataprovider_show, - handle_file_copy, - handle_file_delete, - handle_file_list, - handle_file_move, - handle_file_show, - handle_file_upload, - handle_project_list, - handle_project_show, - handle_project_switch, - handle_project_unswitch, - handle_remote_resource_list, - handle_remote_resource_show, - handle_tag_create, - handle_tag_delete, - handle_tag_list, - handle_tag_show, - handle_tag_update, - handle_task_list, - handle_task_show, - handle_tool_config_boutiques_descriptor, - handle_tool_config_list, - handle_tool_config_show, - handle_tool_list, - handle_tool_show, +from cbrain_cli.cli_utils import handle_errors, is_authenticated, json_printer, version_info +from cbrain_cli.data.background_activities import ( + list_background_activities, + show_background_activity, ) +from cbrain_cli.data.data_providers import ( + delete_unregistered_files, + is_alive, + list_data_providers, + show_data_provider, +) +from cbrain_cli.data.files import ( + copy_file, + delete_file, + list_files, + move_file, + show_file, + upload_file, +) +from cbrain_cli.data.projects import list_projects, show_project, switch_project +from cbrain_cli.data.remote_resources import list_remote_resources, show_remote_resource +from cbrain_cli.data.tags import create_tag, delete_tag, list_tags, show_tag, update_tag +from cbrain_cli.data.tasks import list_tasks, operation_task, show_task +from cbrain_cli.data.tool_configs import ( + list_tool_configs, + show_tool_config, + tool_config_boutiques_descriptor, +) +from cbrain_cli.data.tools import list_tools +from cbrain_cli.formatter.background_activities_fmt import ( + print_activities_list, + print_activity_details, +) +from cbrain_cli.formatter.data_providers_fmt import print_provider_details, print_providers_list +from cbrain_cli.formatter.files_fmt import ( + print_file_details, + print_files_list, + print_move_copy_result, + print_upload_result, +) +from cbrain_cli.formatter.projects_fmt import ( + print_current_project, + print_no_project, + print_projects_list, +) +from cbrain_cli.formatter.remote_resources_fmt import print_resource_details, print_resources_list +from cbrain_cli.formatter.tags_fmt import ( + print_tag_details, + print_tag_operation_result, + print_tags_list, +) +from cbrain_cli.formatter.tasks_fmt import print_task_data, print_task_details +from cbrain_cli.formatter.tool_configs_fmt import ( + print_boutiques_descriptor, + print_tool_config_details, + print_tool_configs_list, +) +from cbrain_cli.formatter.tools_fmt import print_tool_details, print_tools_list from cbrain_cli.sessions import create_session, logout_session from cbrain_cli.users import whoami_user @@ -60,6 +84,12 @@ def main(): action="store_true", help="Output in JSONL format (one JSON object per line)", ) + parser.add_argument( + "--session", + type=str, + default="default", + help="Session name to use for multiple configurations (default: default)", + ) subparsers = parser.add_subparsers(dest="command", help="Available commands") @@ -70,6 +100,9 @@ def main(): # MARK: Session commands (top-level) # Create new session. login_parser = subparsers.add_parser("login", help="Login to CBRAIN") + login_parser.add_argument("-u", "--username", type=str, help="CBRAIN username") + login_parser.add_argument("-p", "--password", type=str, help="CBRAIN password") + login_parser.add_argument("-s", "--server", type=str, help="CBRAIN server URL") login_parser.set_defaults(func=handle_errors(create_session)) # Logout session. @@ -97,12 +130,24 @@ def main(): file_list_parser.add_argument( "--per-page", type=int, default=25, help="Number of files per page (5-1000, default: 25)" ) - file_list_parser.set_defaults(func=handle_errors(handle_file_list)) + file_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_files_list(result, args) if result else None)( + list_files(args) + ) + ) + ) # file show file_show_parser = file_subparsers.add_parser("show", help="Show file details") file_show_parser.add_argument("file", type=int, help="File ID") - file_show_parser.set_defaults(func=handle_errors(handle_file_show)) + file_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_file_details(result, args) if result else None)( + show_file(args) + ) + ) + ) # file upload file_upload_parser = file_subparsers.add_parser("upload", help="Upload a file to CBRAIN") @@ -112,7 +157,11 @@ def main(): ) file_upload_parser.add_argument("--group-id", type=int, help="Group ID") - file_upload_parser.set_defaults(func=handle_errors(handle_file_upload)) + file_upload_parser.set_defaults( + func=handle_errors( + lambda args: print_upload_result(*result) if (result := upload_file(args)) else None + ) + ) # file copy file_copy_parser = file_subparsers.add_parser( @@ -128,7 +177,13 @@ def main(): file_copy_parser.add_argument( "--dp-id", type=int, required=True, help="Destination data provider ID" ) - file_copy_parser.set_defaults(func=handle_errors(handle_file_copy)) + file_copy_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_move_copy_result(*result, operation="copy") if result else None + )(copy_file(args)) + ) + ) # file move file_move_parser = file_subparsers.add_parser( @@ -144,12 +199,24 @@ def main(): file_move_parser.add_argument( "--dp-id", type=int, required=True, help="Destination data provider ID" ) - file_move_parser.set_defaults(func=handle_errors(handle_file_move)) + file_move_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_move_copy_result(*result, operation="move") if result else None + )(move_file(args)) + ) + ) # file delete file_delete_parser = file_subparsers.add_parser("delete", help="Delete a file") file_delete_parser.add_argument("file_id", type=int, help="ID of the file to delete") - file_delete_parser.set_defaults(func=handle_errors(handle_file_delete)) + file_delete_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: json_printer(result) if result else None)( + delete_file(args) + ) + ) + ) # Data provider commands dataprovider_parser = subparsers.add_parser("dataprovider", help="Data provider operations") @@ -161,7 +228,13 @@ def main(): dataprovider_list_parser = dataprovider_subparsers.add_parser( "list", help="List data providers" ) - dataprovider_list_parser.set_defaults(func=handle_errors(handle_dataprovider_list)) + dataprovider_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_providers_list(result, args))( + list_data_providers(args) + ) + ) + ) dataprovider_list_parser.add_argument( "--page", type=int, default=1, help="Page number (default: 1)" @@ -177,14 +250,22 @@ def main(): "show", help="Show data provider details" ) dataprovider_show_parser.add_argument("id", type=int, help="Data provider ID") - dataprovider_show_parser.set_defaults(func=handle_errors(handle_dataprovider_show)) + dataprovider_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_provider_details(result, args))( + show_data_provider(args) + ) + ) + ) # dataprovider is_alive dataprovider_is_alive_parser = dataprovider_subparsers.add_parser( "is-alive", help="Check if a data provider is alive" ) dataprovider_is_alive_parser.add_argument("id", type=int, help="Data provider ID") - dataprovider_is_alive_parser.set_defaults(func=handle_errors(handle_dataprovider_is_alive)) + dataprovider_is_alive_parser.set_defaults( + func=handle_errors(lambda args: (lambda result: json_printer(result))(is_alive(args))) + ) # dataprovider delete-unregistered-files dataprovider_delete_unregistered_files_parser = dataprovider_subparsers.add_parser( @@ -195,7 +276,9 @@ def main(): "id", type=int, help="Data provider ID" ) dataprovider_delete_unregistered_files_parser.set_defaults( - func=handle_errors(handle_dataprovider_delete_unregistered) + func=handle_errors( + lambda args: (lambda result: json_printer(result))(delete_unregistered_files(args)) + ) ) # Project commands @@ -204,27 +287,32 @@ def main(): # project list project_list_parser = project_subparsers.add_parser("list", help="List projects") - project_list_parser.set_defaults(func=handle_errors(handle_project_list)) + project_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_projects_list(result, args))(list_projects(args)) + ) + ) # project switch project_switch_parser = project_subparsers.add_parser("switch", help="Switch to a project") - project_switch_parser.add_argument("group_id", help="Project/Group ID or 'all'") - project_switch_parser.set_defaults(func=handle_errors(handle_project_switch)) - - # project show - project_show_parser = project_subparsers.add_parser( - "show", help="Show current project or specific project by ID" + project_switch_parser.add_argument("group_id", type=int, help="Project/Group ID") + project_switch_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_current_project(result) if result else None)( + switch_project(args) + ) + ) ) - project_show_parser.add_argument( - "project_id", type=int, nargs="?", help="Project ID to show (optional)" - ) - project_show_parser.set_defaults(func=handle_errors(handle_project_show)) - # project unswitch - project_unswitch_parser = project_subparsers.add_parser( - "unswitch", help="Unswitch from current project" + # project show + project_show_parser = project_subparsers.add_parser("show", help="Show current project") + project_show_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_current_project(result) if result else print_no_project() + )(show_project(args)) + ) ) - project_unswitch_parser.set_defaults(func=handle_errors(handle_project_unswitch)) # Tool commands tool_parser = subparsers.add_parser("tool", help="Tool operations") @@ -233,7 +321,13 @@ def main(): # tool show tool_show_parser = tool_subparsers.add_parser("show", help="Show tool details") tool_show_parser.add_argument("id", type=int, help="Tool ID") - tool_show_parser.set_defaults(func=handle_errors(handle_tool_show)) + tool_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_tool_details(result, args) if result else None)( + list_tools(args) + ) + ) + ) # tool list (reusing show_tool without id) tool_list_parser = tool_subparsers.add_parser("list", help="List all tools") @@ -241,7 +335,13 @@ def main(): tool_list_parser.add_argument( "--per-page", type=int, default=25, help="Number of tools per page (5-1000, default: 25)" ) - tool_list_parser.set_defaults(func=handle_errors(handle_tool_list)) + tool_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_tools_list(result, args) if result else None)( + list_tools(args) + ) + ) + ) ## MARK: tool-config commands tool_configs_parser = subparsers.add_parser("tool-config", help="Tool configuration operations") @@ -253,7 +353,13 @@ def main(): tool_configs_list_parser = tool_configs_subparsers.add_parser( "list", help="List all tool configurations" ) - tool_configs_list_parser.set_defaults(func=handle_errors(handle_tool_config_list)) + tool_configs_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_tool_configs_list(result, args))( + list_tool_configs(args) + ) + ) + ) tool_configs_list_parser.add_argument( "--page", type=int, default=1, help="Page number (default: 1)" @@ -270,7 +376,13 @@ def main(): "show", help="Show tool configuration details" ) tool_configs_show_parser.add_argument("id", type=int, help="Tool configuration ID") - tool_configs_show_parser.set_defaults(func=handle_errors(handle_tool_config_show)) + tool_configs_show_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_tool_config_details(result, args) if result else None + )(show_tool_config(args)) + ) + ) # tool-config boutiques-descriptor tool_configs_boutiques_parser = tool_configs_subparsers.add_parser( @@ -278,7 +390,11 @@ def main(): ) tool_configs_boutiques_parser.add_argument("id", type=int, help="Tool configuration ID") tool_configs_boutiques_parser.set_defaults( - func=handle_errors(handle_tool_config_boutiques_descriptor) + func=handle_errors( + lambda args: ( + lambda result: print_boutiques_descriptor(result, args) if result else None + )(tool_config_boutiques_descriptor(args)) + ) ) # Tag commands @@ -287,7 +403,11 @@ def main(): # tag list tag_list_parser = tag_subparsers.add_parser("list", help="List tags") - tag_list_parser.set_defaults(func=handle_errors(handle_tag_list)) + tag_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_tags_list(result, args))(list_tags(args)) + ) + ) tag_list_parser.add_argument("--page", type=int, default=1, help="Page number (default: 1)") tag_list_parser.add_argument( @@ -297,14 +417,30 @@ def main(): # tag show tag_show_parser = tag_subparsers.add_parser("show", help="Show tag details") tag_show_parser.add_argument("id", type=int, help="Tag ID") - tag_show_parser.set_defaults(func=handle_errors(handle_tag_show)) + tag_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_tag_details(result, args) if result else None)( + show_tag(args) + ) + ) + ) # tag create tag_create_parser = tag_subparsers.add_parser("create", help="Create a new tag") tag_create_parser.add_argument("--name", type=str, required=True, help="Tag name") tag_create_parser.add_argument("--user-id", type=int, required=True, help="User ID") tag_create_parser.add_argument("--group-id", type=int, required=True, help="Group ID") - tag_create_parser.set_defaults(func=handle_errors(handle_tag_create)) + tag_create_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_tag_operation_result( + "create", success=result[1], error_msg=result[2], response_status=result[3] + ) + if result + else None + )(create_tag(args)) + ) + ) # tag update tag_update_parser = tag_subparsers.add_parser("update", help="Update an existing tag") @@ -316,7 +452,21 @@ def main(): tag_update_parser.add_argument("--name", type=str, required=True, help="Tag name") tag_update_parser.add_argument("--user-id", type=int, required=True, help="User ID") tag_update_parser.add_argument("--group-id", type=int, required=True, help="Group ID") - tag_update_parser.set_defaults(func=handle_errors(handle_tag_update)) + tag_update_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_tag_operation_result( + "update", + tag_id=args.tag_id, + success=result[1], + error_msg=result[2], + response_status=result[3], + ) + if result + else None + )(update_tag(args)) + ) + ) # tag delete tag_delete_parser = tag_subparsers.add_parser("delete", help="Delete a tag") @@ -325,7 +475,21 @@ def main(): type=int, help="Tag ID to delete", ) - tag_delete_parser.set_defaults(func=handle_errors(handle_tag_delete)) + tag_delete_parser.set_defaults( + func=handle_errors( + lambda args: ( + lambda result: print_tag_operation_result( + "delete", + tag_id=args.tag_id, + success=result[0], + error_msg=result[1], + response_status=result[2], + ) + if result + else None + )(delete_tag(args)) + ) + ) # Background activity commands background_parser = subparsers.add_parser("background", help="Background activity operations") @@ -337,14 +501,26 @@ def main(): background_list_parser = background_subparsers.add_parser( "list", help="List background activities" ) - background_list_parser.set_defaults(func=handle_errors(handle_background_list)) + background_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_activities_list(result, args) if result else None)( + list_background_activities(args) + ) + ) + ) # background show background_show_parser = background_subparsers.add_parser( "show", help="Show background activity details" ) background_show_parser.add_argument("id", type=int, help="Background activity ID") - background_show_parser.set_defaults(func=handle_errors(handle_background_show)) + background_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_activity_details(result, args) if result else None)( + show_background_activity(args) + ) + ) + ) # Task commands task_parser = subparsers.add_parser("task", help="Task operations") @@ -365,12 +541,22 @@ def main(): nargs="?", help="Filter value (required if filter_type is specified)", ) - task_list_parser.set_defaults(func=handle_errors(handle_task_list)) + task_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_task_data(result, args))(list_tasks(args)) + ) + ) # task show task_show_parser = task_subparsers.add_parser("show", help="Show task details") task_show_parser.add_argument("task", type=int, help="Task ID") - task_show_parser.set_defaults(func=handle_errors(handle_task_show)) + task_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_task_details(result, args) if result else None)( + show_task(args) + ) + ) + ) # task operation task_operation_parser = task_subparsers.add_parser("operation", help="operation on a task") @@ -388,14 +574,26 @@ def main(): remote_resource_list_parser = remote_resource_subparsers.add_parser( "list", help="List remote resources" ) - remote_resource_list_parser.set_defaults(func=handle_errors(handle_remote_resource_list)) + remote_resource_list_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_resources_list(result, args))( + list_remote_resources(args) + ) + ) + ) # remote-resource show remote_resource_show_parser = remote_resource_subparsers.add_parser( "show", help="Show remote resource details" ) remote_resource_show_parser.add_argument("remote_resource", type=int, help="Remote resource ID") - remote_resource_show_parser.set_defaults(func=handle_errors(handle_remote_resource_show)) + remote_resource_show_parser.set_defaults( + func=handle_errors( + lambda args: (lambda result: print_resource_details(result, args) if result else None)( + show_remote_resource(args) + ) + ) + ) # MARK: Setup CLI args = parser.parse_args() @@ -404,21 +602,21 @@ def main(): parser.print_help() return - # Handle session commands (no authentication needed for login, version, and whoami). + # Handle public commands (no authentication needed). if args.command == "login": return handle_errors(create_session)(args) + elif args.command == "logout": + return handle_errors(logout_session)(args) elif args.command == "version": return handle_errors(version_info)(args) - elif args.command == "whoami": - return handle_errors(whoami_user)(args) # All other commands require authentication. if not is_authenticated(): return 1 # Handle authenticated commands. - if args.command == "logout": - return handle_errors(logout_session)(args) + if args.command == "whoami": + return handle_errors(whoami_user)(args) elif args.command in [ "file", "dataprovider", diff --git a/cbrain_cli/sessions.py b/cbrain_cli/sessions.py index 4559f52..6d2e391 100644 --- a/cbrain_cli/sessions.py +++ b/cbrain_cli/sessions.py @@ -5,7 +5,6 @@ import urllib.parse import urllib.request -from cbrain_cli.cli_utils import api_token, cbrain_url from cbrain_cli.config import ( CREDENTIALS_FILE, DEFAULT_BASE_URL, @@ -24,28 +23,31 @@ def create_session(args): None A command is run via inputs from the user. """ + from cbrain_cli.cli_utils import all_credentials, api_token, cbrain_url, session_name - if CREDENTIALS_FILE.exists(): - print("Already logged in. Use 'cbrain logout' to logout.") + if cbrain_url is not None and api_token is not None: + print(f"Already logged in to session '{session_name}'. Use 'cbrain logout' to logout.") return 1 # Get user input. - cbrain_url = input("Enter CBRAIN server base URL [default: localhost:3000]: ").strip() - if not cbrain_url: - cbrain_url = DEFAULT_BASE_URL + cbrain_url_input = getattr(args, "server", None) or input( + "Enter CBRAIN server base URL [default: localhost:3000]: " + ).strip() + if not cbrain_url_input: + cbrain_url_input = DEFAULT_BASE_URL - username = input("Enter CBRAIN username: ").strip() + username = getattr(args, "username", None) or input("Enter CBRAIN username: ").strip() if not username: print("Username is required") return 1 - password = getpass.getpass("Enter CBRAIN password: ") + password = getattr(args, "password", None) or getpass.getpass("Enter CBRAIN password: ") if not password: print("Password is required") return 1 # Prepare the login request. - login_endpoint = f"{cbrain_url}/session" + login_endpoint = f"{cbrain_url_input}/session" # Prepare form data. form_data = {"login": username, "password": password} @@ -73,17 +75,21 @@ def create_session(args): # Prepare credentials data. credentials = { - "cbrain_url": cbrain_url, + "cbrain_url": cbrain_url_input, "api_token": cbrain_api_token, "user_id": cbrain_user_id, "timestamp": datetime.datetime.now().isoformat(), } # Save credentials to file. + all_credentials[session_name] = credentials with open(CREDENTIALS_FILE, "w") as f: - json.dump(credentials, f, indent=2) + json.dump(all_credentials, f, indent=2) - print(f"Connection successful, API token saved in {CREDENTIALS_FILE}") + print( + f"Connection successful, API token saved in {CREDENTIALS_FILE} " + f"for session '{session_name}'" + ) return 0 @@ -97,42 +103,78 @@ def logout_session(args): None A command is run via inputs from the user. """ + from cbrain_cli.cli_utils import all_credentials, session_name, session_specified - if not cbrain_url or not api_token: - print("Invalid credentials file. Removing local session.") - CREDENTIALS_FILE.unlink() + if not session_specified and len(all_credentials) > 0: + sessions_to_logout = list(all_credentials.keys()) + else: + sessions_to_logout = [session_name] + + if not sessions_to_logout: + print("No active sessions to logout.") return 0 - # Prepare logout request. - logout_endpoint = f"{cbrain_url}/session" + for s_name in sessions_to_logout: + creds = all_credentials.get(s_name, {}) + s_url = creds.get("cbrain_url") + s_token = creds.get("api_token") + s_uid = creds.get("user_id") - # Create headers with authorization. - headers = auth_headers(api_token) + if not s_url or not s_token: + if s_name in all_credentials: + print(f"Invalid credentials for session '{s_name}'. Removing local session.") + del all_credentials[s_name] + else: + if session_specified: + print(f"Not logged in to session '{s_name}'.") + elif len(sessions_to_logout) == 1 and s_name == "default": + print("Not logged in. Use 'cbrain login' to login first.") + continue + + # Try to fetch username for a nicer logout message + username = s_name + try: + req = urllib.request.Request( + f"{s_url}/users/{s_uid}", headers=auth_headers(s_token), method="GET" + ) + with urllib.request.urlopen(req) as response: + user_data = json.loads(response.read().decode("utf-8")) + username = user_data.get("login", s_name) + except Exception: + pass + + # Prepare logout request. + logout_endpoint = f"{s_url}/session" + + # Create the DELETE request. + request = urllib.request.Request( + logout_endpoint, + data=None, # No payload for DELETE + headers=auth_headers(s_token), + method="DELETE", + ) + + # Make the request to logout from server. + try: + with urllib.request.urlopen(request) as response: + if response.status == 200: + print(f"Successfully logged out from CBRAIN server as {username}.") + else: + print(f"Logout failed for session '{s_name}'.") + except urllib.error.HTTPError as e: + if e.code == 401: + print(f"Session '{s_name}' already expired on server.") + else: + print(f"Logout request failed for '{s_name}': HTTP {e.code}") + except urllib.error.URLError as e: + print(f"Network error during logout for '{s_name}': {e}") - # Create the DELETE request. - request = urllib.request.Request( - logout_endpoint, - data=None, # No payload for DELETE - headers=headers, - method="DELETE", - ) + # Always remove local credentials for this session. + if s_name in all_credentials: + del all_credentials[s_name] + print(f"Local session '{s_name}' removed from {CREDENTIALS_FILE}") + + with open(CREDENTIALS_FILE, "w") as f: + json.dump(all_credentials, f, indent=2) - # Make the request to logout from server. - try: - with urllib.request.urlopen(request) as response: - if response.status == 200: - print("Successfully logged out from CBRAIN server.") - else: - print("Logout failed") - except urllib.error.HTTPError as e: - if e.code == 401: - print("Session already expired on server.") - else: - print(f"Logout request failed: HTTP {e.code}") - except urllib.error.URLError as e: - print(f"Network error during logout: {e}") - - # Always remove local credentials file. - CREDENTIALS_FILE.unlink() - print(f"Local session removed from {CREDENTIALS_FILE}") return 0 From 82b35acb08a3d1a21dd2a8cf9d6f7d5c66c79bd0 Mon Sep 17 00:00:00 2001 From: rafsanneloy Date: Thu, 5 Mar 2026 00:50:03 +0600 Subject: [PATCH 2/5] Roleback removed code Signed-off-by: rafsanneloy --- cbrain_cli/cli_utils.py | 7 +- cbrain_cli/data/projects.py | 113 +++++++----- cbrain_cli/main.py | 343 ++++++++---------------------------- cbrain_cli/sessions.py | 61 +++++-- 4 files changed, 198 insertions(+), 326 deletions(-) diff --git a/cbrain_cli/cli_utils.py b/cbrain_cli/cli_utils.py index c3a2633..3ddaf53 100644 --- a/cbrain_cli/cli_utils.py +++ b/cbrain_cli/cli_utils.py @@ -120,7 +120,7 @@ def handle_connection_error(error): if error.code == 401: print(f"{status_description}: {error.reason}") - print("Try with Authorized Access") + print("Error: Access denied. Please log in using authorized credentials.") elif error.code in (400, 404, 422, 500): # Try to extract specific error message from response try: @@ -143,10 +143,11 @@ def handle_connection_error(error): or error_data.get("notice") or str(error_data) ) - # Check if this looks like a password change redirect via API + # Check if this looks like a password change redirect if "change_password" in error_msg: print( - f"{status_description}: Account requires a password change. " + f"{status_description}: Account requires " + "a password change. " "Please log into the web portal." ) return diff --git a/cbrain_cli/data/projects.py b/cbrain_cli/data/projects.py index 0aaea61..b9166ad 100644 --- a/cbrain_cli/data/projects.py +++ b/cbrain_cli/data/projects.py @@ -20,12 +20,26 @@ def switch_project(args): dict or None Dictionary containing project details if successful, None otherwise """ + from cbrain_cli.cli_utils import all_credentials, session_name + # Get the group ID from the group_id argument group_id = getattr(args, "group_id", None) if not group_id: print("Error: Group ID is required") return None + # Handle the special case of "all" + if group_id == "all": + print("Project switch 'all' not yet implemented as of Aug 2025") + return None + + # Convert to integer for regular group IDs + try: + group_id = int(group_id) + except ValueError: + print(f"Error: Invalid group ID '{group_id}'. Must be a number or 'all'") + return None + # Step 1: Call the switch API switch_endpoint = f"{cbrain_url}/groups/switch?id={group_id}" headers = auth_headers(api_token) @@ -43,73 +57,85 @@ def switch_project(args): group_data_text = group_response.read().decode("utf-8") group_data = json.loads(group_data_text) - # Step 3: Update credentials file with current group_id - from cbrain_cli.cli_utils import session_name - - if CREDENTIALS_FILE.exists(): - with open(CREDENTIALS_FILE) as f: - all_credentials_file = json.load(f) - - if session_name in all_credentials_file: - all_credentials_file[session_name]["current_group_id"] = group_id - all_credentials_file[session_name]["current_group_name"] = group_data.get( + # Step 3: Update credentials file with current group_id + if session_name in all_credentials: + all_credentials[session_name]["current_group_id"] = group_id + all_credentials[session_name]["current_group_name"] = group_data.get( "name", "Unknown" ) with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials_file, f, indent=2) + json.dump(all_credentials, f, indent=2) - return group_data + return group_data def show_project(args): """ - Get the current project/group from credentials. + Get the current project/group from credentials or show a specific project by ID. Parameters ---------- args : argparse.Namespace - Command line arguments + Command line arguments, may include project_id Returns ------- dict or None Dictionary containing project details if successful, None if no project set """ - from cbrain_cli.cli_utils import session_name - - with open(CREDENTIALS_FILE) as f: - all_credentials_file = json.load(f) + from cbrain_cli.cli_utils import all_credentials, session_name + + # Check if a specific project ID was provided + project_id = getattr(args, "project_id", None) + + if project_id: + # Show specific project by ID + group_endpoint = f"{cbrain_url}/groups/{project_id}" + headers = auth_headers(api_token) + request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") + + try: + with urllib.request.urlopen(request) as response: + data = response.read().decode("utf-8") + group_data = json.loads(data) + return group_data + except urllib.error.HTTPError as e: + if e.code == 404: + print(f"Error: Project with ID {project_id} not found") + return None + else: + raise + else: + # Show current project from credentials + session_creds = all_credentials.get(session_name, {}) + current_group_id = session_creds.get("current_group_id") + if not current_group_id: + return None - credentials = all_credentials_file.get(session_name, {}) - current_group_id = credentials.get("current_group_id") - if not current_group_id: - return None + # Get fresh group details from server + group_endpoint = f"{cbrain_url}/groups/{current_group_id}" + headers = auth_headers(api_token) - # Get fresh group details from server - group_endpoint = f"{cbrain_url}/groups/{current_group_id}" - headers = auth_headers(api_token) + request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") - request = urllib.request.Request(group_endpoint, data=None, headers=headers, method="GET") + try: + with urllib.request.urlopen(request) as response: + data = response.read().decode("utf-8") + group_data = json.loads(data) + return group_data - try: - with urllib.request.urlopen(request) as response: - data = response.read().decode("utf-8") - group_data = json.loads(data) - return group_data - - except urllib.error.HTTPError as e: - if e.code == 404: - print(f"Error: Current project (ID {current_group_id}) no longer exists") - # Clear the invalid group_id from credentials - if session_name in all_credentials_file: - all_credentials_file[session_name].pop("current_group_id", None) - all_credentials_file[session_name].pop("current_group_name", None) + except urllib.error.HTTPError as e: + if e.code == 404: + print(f"Error: Current project (ID {current_group_id}) no longer exists") + # Clear the invalid group_id from credentials + session_creds.pop("current_group_id", None) + session_creds.pop("current_group_name", None) with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials_file, f, indent=2) - return None - else: - raise + json.dump(all_credentials, f, indent=2) + return None + else: + raise def list_projects(args): @@ -139,3 +165,4 @@ def list_projects(args): projects_data = json.loads(data) return projects_data + diff --git a/cbrain_cli/main.py b/cbrain_cli/main.py index 980191c..7d52b1c 100644 --- a/cbrain_cli/main.py +++ b/cbrain_cli/main.py @@ -5,64 +5,40 @@ import argparse import sys -from cbrain_cli.cli_utils import handle_errors, is_authenticated, json_printer, version_info -from cbrain_cli.data.background_activities import ( - list_background_activities, - show_background_activity, +from cbrain_cli.cli_utils import handle_errors, is_authenticated, version_info +from cbrain_cli.data.tasks import operation_task +from cbrain_cli.handlers import ( + handle_background_list, + handle_background_show, + handle_dataprovider_delete_unregistered, + handle_dataprovider_is_alive, + handle_dataprovider_list, + handle_dataprovider_show, + handle_file_copy, + handle_file_delete, + handle_file_list, + handle_file_move, + handle_file_show, + handle_file_upload, + handle_project_list, + handle_project_show, + handle_project_switch, + handle_project_unswitch, + handle_remote_resource_list, + handle_remote_resource_show, + handle_tag_create, + handle_tag_delete, + handle_tag_list, + handle_tag_show, + handle_tag_update, + handle_task_list, + handle_task_show, + handle_tool_config_boutiques_descriptor, + handle_tool_config_list, + handle_tool_config_show, + handle_tool_list, + handle_tool_show, ) -from cbrain_cli.data.data_providers import ( - delete_unregistered_files, - is_alive, - list_data_providers, - show_data_provider, -) -from cbrain_cli.data.files import ( - copy_file, - delete_file, - list_files, - move_file, - show_file, - upload_file, -) -from cbrain_cli.data.projects import list_projects, show_project, switch_project -from cbrain_cli.data.remote_resources import list_remote_resources, show_remote_resource -from cbrain_cli.data.tags import create_tag, delete_tag, list_tags, show_tag, update_tag -from cbrain_cli.data.tasks import list_tasks, operation_task, show_task -from cbrain_cli.data.tool_configs import ( - list_tool_configs, - show_tool_config, - tool_config_boutiques_descriptor, -) -from cbrain_cli.data.tools import list_tools -from cbrain_cli.formatter.background_activities_fmt import ( - print_activities_list, - print_activity_details, -) -from cbrain_cli.formatter.data_providers_fmt import print_provider_details, print_providers_list -from cbrain_cli.formatter.files_fmt import ( - print_file_details, - print_files_list, - print_move_copy_result, - print_upload_result, -) -from cbrain_cli.formatter.projects_fmt import ( - print_current_project, - print_no_project, - print_projects_list, -) -from cbrain_cli.formatter.remote_resources_fmt import print_resource_details, print_resources_list -from cbrain_cli.formatter.tags_fmt import ( - print_tag_details, - print_tag_operation_result, - print_tags_list, -) -from cbrain_cli.formatter.tasks_fmt import print_task_data, print_task_details -from cbrain_cli.formatter.tool_configs_fmt import ( - print_boutiques_descriptor, - print_tool_config_details, - print_tool_configs_list, -) -from cbrain_cli.formatter.tools_fmt import print_tool_details, print_tools_list from cbrain_cli.sessions import create_session, logout_session from cbrain_cli.users import whoami_user @@ -130,24 +106,12 @@ def main(): file_list_parser.add_argument( "--per-page", type=int, default=25, help="Number of files per page (5-1000, default: 25)" ) - file_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_files_list(result, args) if result else None)( - list_files(args) - ) - ) - ) + file_list_parser.set_defaults(func=handle_errors(handle_file_list)) # file show file_show_parser = file_subparsers.add_parser("show", help="Show file details") file_show_parser.add_argument("file", type=int, help="File ID") - file_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_file_details(result, args) if result else None)( - show_file(args) - ) - ) - ) + file_show_parser.set_defaults(func=handle_errors(handle_file_show)) # file upload file_upload_parser = file_subparsers.add_parser("upload", help="Upload a file to CBRAIN") @@ -157,11 +121,7 @@ def main(): ) file_upload_parser.add_argument("--group-id", type=int, help="Group ID") - file_upload_parser.set_defaults( - func=handle_errors( - lambda args: print_upload_result(*result) if (result := upload_file(args)) else None - ) - ) + file_upload_parser.set_defaults(func=handle_errors(handle_file_upload)) # file copy file_copy_parser = file_subparsers.add_parser( @@ -177,13 +137,7 @@ def main(): file_copy_parser.add_argument( "--dp-id", type=int, required=True, help="Destination data provider ID" ) - file_copy_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_move_copy_result(*result, operation="copy") if result else None - )(copy_file(args)) - ) - ) + file_copy_parser.set_defaults(func=handle_errors(handle_file_copy)) # file move file_move_parser = file_subparsers.add_parser( @@ -199,24 +153,12 @@ def main(): file_move_parser.add_argument( "--dp-id", type=int, required=True, help="Destination data provider ID" ) - file_move_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_move_copy_result(*result, operation="move") if result else None - )(move_file(args)) - ) - ) + file_move_parser.set_defaults(func=handle_errors(handle_file_move)) # file delete file_delete_parser = file_subparsers.add_parser("delete", help="Delete a file") file_delete_parser.add_argument("file_id", type=int, help="ID of the file to delete") - file_delete_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: json_printer(result) if result else None)( - delete_file(args) - ) - ) - ) + file_delete_parser.set_defaults(func=handle_errors(handle_file_delete)) # Data provider commands dataprovider_parser = subparsers.add_parser("dataprovider", help="Data provider operations") @@ -228,13 +170,7 @@ def main(): dataprovider_list_parser = dataprovider_subparsers.add_parser( "list", help="List data providers" ) - dataprovider_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_providers_list(result, args))( - list_data_providers(args) - ) - ) - ) + dataprovider_list_parser.set_defaults(func=handle_errors(handle_dataprovider_list)) dataprovider_list_parser.add_argument( "--page", type=int, default=1, help="Page number (default: 1)" @@ -250,22 +186,14 @@ def main(): "show", help="Show data provider details" ) dataprovider_show_parser.add_argument("id", type=int, help="Data provider ID") - dataprovider_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_provider_details(result, args))( - show_data_provider(args) - ) - ) - ) + dataprovider_show_parser.set_defaults(func=handle_errors(handle_dataprovider_show)) # dataprovider is_alive dataprovider_is_alive_parser = dataprovider_subparsers.add_parser( "is-alive", help="Check if a data provider is alive" ) dataprovider_is_alive_parser.add_argument("id", type=int, help="Data provider ID") - dataprovider_is_alive_parser.set_defaults( - func=handle_errors(lambda args: (lambda result: json_printer(result))(is_alive(args))) - ) + dataprovider_is_alive_parser.set_defaults(func=handle_errors(handle_dataprovider_is_alive)) # dataprovider delete-unregistered-files dataprovider_delete_unregistered_files_parser = dataprovider_subparsers.add_parser( @@ -276,9 +204,7 @@ def main(): "id", type=int, help="Data provider ID" ) dataprovider_delete_unregistered_files_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: json_printer(result))(delete_unregistered_files(args)) - ) + func=handle_errors(handle_dataprovider_delete_unregistered) ) # Project commands @@ -287,32 +213,27 @@ def main(): # project list project_list_parser = project_subparsers.add_parser("list", help="List projects") - project_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_projects_list(result, args))(list_projects(args)) - ) - ) + project_list_parser.set_defaults(func=handle_errors(handle_project_list)) # project switch project_switch_parser = project_subparsers.add_parser("switch", help="Switch to a project") - project_switch_parser.add_argument("group_id", type=int, help="Project/Group ID") - project_switch_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_current_project(result) if result else None)( - switch_project(args) - ) - ) - ) + project_switch_parser.add_argument("group_id", help="Project/Group ID or 'all'") + project_switch_parser.set_defaults(func=handle_errors(handle_project_switch)) # project show - project_show_parser = project_subparsers.add_parser("show", help="Show current project") - project_show_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_current_project(result) if result else print_no_project() - )(show_project(args)) - ) + project_show_parser = project_subparsers.add_parser( + "show", help="Show current project or specific project by ID" + ) + project_show_parser.add_argument( + "project_id", type=int, nargs="?", help="Project ID to show (optional)" ) + project_show_parser.set_defaults(func=handle_errors(handle_project_show)) + + # project unswitch + project_unswitch_parser = project_subparsers.add_parser( + "unswitch", help="Unswitch from current project" + ) + project_unswitch_parser.set_defaults(func=handle_errors(handle_project_unswitch)) # Tool commands tool_parser = subparsers.add_parser("tool", help="Tool operations") @@ -321,13 +242,7 @@ def main(): # tool show tool_show_parser = tool_subparsers.add_parser("show", help="Show tool details") tool_show_parser.add_argument("id", type=int, help="Tool ID") - tool_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_tool_details(result, args) if result else None)( - list_tools(args) - ) - ) - ) + tool_show_parser.set_defaults(func=handle_errors(handle_tool_show)) # tool list (reusing show_tool without id) tool_list_parser = tool_subparsers.add_parser("list", help="List all tools") @@ -335,13 +250,7 @@ def main(): tool_list_parser.add_argument( "--per-page", type=int, default=25, help="Number of tools per page (5-1000, default: 25)" ) - tool_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_tools_list(result, args) if result else None)( - list_tools(args) - ) - ) - ) + tool_list_parser.set_defaults(func=handle_errors(handle_tool_list)) ## MARK: tool-config commands tool_configs_parser = subparsers.add_parser("tool-config", help="Tool configuration operations") @@ -353,13 +262,7 @@ def main(): tool_configs_list_parser = tool_configs_subparsers.add_parser( "list", help="List all tool configurations" ) - tool_configs_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_tool_configs_list(result, args))( - list_tool_configs(args) - ) - ) - ) + tool_configs_list_parser.set_defaults(func=handle_errors(handle_tool_config_list)) tool_configs_list_parser.add_argument( "--page", type=int, default=1, help="Page number (default: 1)" @@ -376,13 +279,7 @@ def main(): "show", help="Show tool configuration details" ) tool_configs_show_parser.add_argument("id", type=int, help="Tool configuration ID") - tool_configs_show_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_tool_config_details(result, args) if result else None - )(show_tool_config(args)) - ) - ) + tool_configs_show_parser.set_defaults(func=handle_errors(handle_tool_config_show)) # tool-config boutiques-descriptor tool_configs_boutiques_parser = tool_configs_subparsers.add_parser( @@ -390,11 +287,7 @@ def main(): ) tool_configs_boutiques_parser.add_argument("id", type=int, help="Tool configuration ID") tool_configs_boutiques_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_boutiques_descriptor(result, args) if result else None - )(tool_config_boutiques_descriptor(args)) - ) + func=handle_errors(handle_tool_config_boutiques_descriptor) ) # Tag commands @@ -403,11 +296,7 @@ def main(): # tag list tag_list_parser = tag_subparsers.add_parser("list", help="List tags") - tag_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_tags_list(result, args))(list_tags(args)) - ) - ) + tag_list_parser.set_defaults(func=handle_errors(handle_tag_list)) tag_list_parser.add_argument("--page", type=int, default=1, help="Page number (default: 1)") tag_list_parser.add_argument( @@ -417,30 +306,14 @@ def main(): # tag show tag_show_parser = tag_subparsers.add_parser("show", help="Show tag details") tag_show_parser.add_argument("id", type=int, help="Tag ID") - tag_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_tag_details(result, args) if result else None)( - show_tag(args) - ) - ) - ) + tag_show_parser.set_defaults(func=handle_errors(handle_tag_show)) # tag create tag_create_parser = tag_subparsers.add_parser("create", help="Create a new tag") tag_create_parser.add_argument("--name", type=str, required=True, help="Tag name") tag_create_parser.add_argument("--user-id", type=int, required=True, help="User ID") tag_create_parser.add_argument("--group-id", type=int, required=True, help="Group ID") - tag_create_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_tag_operation_result( - "create", success=result[1], error_msg=result[2], response_status=result[3] - ) - if result - else None - )(create_tag(args)) - ) - ) + tag_create_parser.set_defaults(func=handle_errors(handle_tag_create)) # tag update tag_update_parser = tag_subparsers.add_parser("update", help="Update an existing tag") @@ -452,21 +325,7 @@ def main(): tag_update_parser.add_argument("--name", type=str, required=True, help="Tag name") tag_update_parser.add_argument("--user-id", type=int, required=True, help="User ID") tag_update_parser.add_argument("--group-id", type=int, required=True, help="Group ID") - tag_update_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_tag_operation_result( - "update", - tag_id=args.tag_id, - success=result[1], - error_msg=result[2], - response_status=result[3], - ) - if result - else None - )(update_tag(args)) - ) - ) + tag_update_parser.set_defaults(func=handle_errors(handle_tag_update)) # tag delete tag_delete_parser = tag_subparsers.add_parser("delete", help="Delete a tag") @@ -475,21 +334,7 @@ def main(): type=int, help="Tag ID to delete", ) - tag_delete_parser.set_defaults( - func=handle_errors( - lambda args: ( - lambda result: print_tag_operation_result( - "delete", - tag_id=args.tag_id, - success=result[0], - error_msg=result[1], - response_status=result[2], - ) - if result - else None - )(delete_tag(args)) - ) - ) + tag_delete_parser.set_defaults(func=handle_errors(handle_tag_delete)) # Background activity commands background_parser = subparsers.add_parser("background", help="Background activity operations") @@ -501,26 +346,14 @@ def main(): background_list_parser = background_subparsers.add_parser( "list", help="List background activities" ) - background_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_activities_list(result, args) if result else None)( - list_background_activities(args) - ) - ) - ) + background_list_parser.set_defaults(func=handle_errors(handle_background_list)) # background show background_show_parser = background_subparsers.add_parser( "show", help="Show background activity details" ) background_show_parser.add_argument("id", type=int, help="Background activity ID") - background_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_activity_details(result, args) if result else None)( - show_background_activity(args) - ) - ) - ) + background_show_parser.set_defaults(func=handle_errors(handle_background_show)) # Task commands task_parser = subparsers.add_parser("task", help="Task operations") @@ -541,22 +374,12 @@ def main(): nargs="?", help="Filter value (required if filter_type is specified)", ) - task_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_task_data(result, args))(list_tasks(args)) - ) - ) + task_list_parser.set_defaults(func=handle_errors(handle_task_list)) # task show task_show_parser = task_subparsers.add_parser("show", help="Show task details") task_show_parser.add_argument("task", type=int, help="Task ID") - task_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_task_details(result, args) if result else None)( - show_task(args) - ) - ) - ) + task_show_parser.set_defaults(func=handle_errors(handle_task_show)) # task operation task_operation_parser = task_subparsers.add_parser("operation", help="operation on a task") @@ -574,26 +397,14 @@ def main(): remote_resource_list_parser = remote_resource_subparsers.add_parser( "list", help="List remote resources" ) - remote_resource_list_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_resources_list(result, args))( - list_remote_resources(args) - ) - ) - ) + remote_resource_list_parser.set_defaults(func=handle_errors(handle_remote_resource_list)) # remote-resource show remote_resource_show_parser = remote_resource_subparsers.add_parser( "show", help="Show remote resource details" ) remote_resource_show_parser.add_argument("remote_resource", type=int, help="Remote resource ID") - remote_resource_show_parser.set_defaults( - func=handle_errors( - lambda args: (lambda result: print_resource_details(result, args) if result else None)( - show_remote_resource(args) - ) - ) - ) + remote_resource_show_parser.set_defaults(func=handle_errors(handle_remote_resource_show)) # MARK: Setup CLI args = parser.parse_args() @@ -609,15 +420,15 @@ def main(): return handle_errors(logout_session)(args) elif args.command == "version": return handle_errors(version_info)(args) + elif args.command == "whoami": + return handle_errors(whoami_user)(args) # All other commands require authentication. if not is_authenticated(): return 1 # Handle authenticated commands. - if args.command == "whoami": - return handle_errors(whoami_user)(args) - elif args.command in [ + if args.command in [ "file", "dataprovider", "project", diff --git a/cbrain_cli/sessions.py b/cbrain_cli/sessions.py index 6d2e391..ce17640 100644 --- a/cbrain_cli/sessions.py +++ b/cbrain_cli/sessions.py @@ -23,25 +23,37 @@ def create_session(args): None A command is run via inputs from the user. """ - from cbrain_cli.cli_utils import all_credentials, api_token, cbrain_url, session_name + from cbrain_cli.cli_utils import ( + all_credentials, + api_token, + cbrain_url, + session_name, + ) if cbrain_url is not None and api_token is not None: - print(f"Already logged in to session '{session_name}'. Use 'cbrain logout' to logout.") + print( + f"Already logged in to session '{session_name}'. " + "Use 'cbrain logout' to logout." + ) return 1 - # Get user input. + # Get user input (support non-interactive flags). cbrain_url_input = getattr(args, "server", None) or input( "Enter CBRAIN server base URL [default: localhost:3000]: " ).strip() if not cbrain_url_input: cbrain_url_input = DEFAULT_BASE_URL - username = getattr(args, "username", None) or input("Enter CBRAIN username: ").strip() + username = getattr(args, "username", None) or input( + "Enter CBRAIN username: " + ).strip() if not username: print("Username is required") return 1 - password = getattr(args, "password", None) or getpass.getpass("Enter CBRAIN password: ") + password = getattr(args, "password", None) or getpass.getpass( + "Enter CBRAIN password: " + ) if not password: print("Password is required") return 1 @@ -87,8 +99,8 @@ def create_session(args): json.dump(all_credentials, f, indent=2) print( - f"Connection successful, API token saved in {CREDENTIALS_FILE} " - f"for session '{session_name}'" + f"Connection successful, API token saved in " + f"{CREDENTIALS_FILE} for session '{session_name}'" ) return 0 @@ -98,12 +110,19 @@ def logout_session(args): """ Logout from CBRAIN by deleting the session file. + If --session is specified, logout only that session. + If no --session is specified, logout all active sessions. + Returns ------- None A command is run via inputs from the user. """ - from cbrain_cli.cli_utils import all_credentials, session_name, session_specified + from cbrain_cli.cli_utils import ( + all_credentials, + session_name, + session_specified, + ) if not session_specified and len(all_credentials) > 0: sessions_to_logout = list(all_credentials.keys()) @@ -122,20 +141,27 @@ def logout_session(args): if not s_url or not s_token: if s_name in all_credentials: - print(f"Invalid credentials for session '{s_name}'. Removing local session.") + print( + f"Invalid credentials for session '{s_name}'. " + "Removing local session." + ) del all_credentials[s_name] else: if session_specified: print(f"Not logged in to session '{s_name}'.") elif len(sessions_to_logout) == 1 and s_name == "default": - print("Not logged in. Use 'cbrain login' to login first.") + print( + "Not logged in. Use 'cbrain login' to login first." + ) continue # Try to fetch username for a nicer logout message username = s_name try: req = urllib.request.Request( - f"{s_url}/users/{s_uid}", headers=auth_headers(s_token), method="GET" + f"{s_url}/users/{s_uid}", + headers=auth_headers(s_token), + method="GET", ) with urllib.request.urlopen(req) as response: user_data = json.loads(response.read().decode("utf-8")) @@ -158,21 +184,28 @@ def logout_session(args): try: with urllib.request.urlopen(request) as response: if response.status == 200: - print(f"Successfully logged out from CBRAIN server as {username}.") + print( + "Successfully logged out from " + f"CBRAIN server as {username}." + ) else: print(f"Logout failed for session '{s_name}'.") except urllib.error.HTTPError as e: if e.code == 401: print(f"Session '{s_name}' already expired on server.") else: - print(f"Logout request failed for '{s_name}': HTTP {e.code}") + print( + f"Logout request failed for '{s_name}': HTTP {e.code}" + ) except urllib.error.URLError as e: print(f"Network error during logout for '{s_name}': {e}") # Always remove local credentials for this session. if s_name in all_credentials: del all_credentials[s_name] - print(f"Local session '{s_name}' removed from {CREDENTIALS_FILE}") + print( + f"Local session '{s_name}' removed from {CREDENTIALS_FILE}" + ) with open(CREDENTIALS_FILE, "w") as f: json.dump(all_credentials, f, indent=2) From 80f3664684e627c078e9476d9d43da220a8b718a Mon Sep 17 00:00:00 2001 From: RafsanNeloy Date: Wed, 11 Mar 2026 01:49:21 +0600 Subject: [PATCH 3/5] implemented switch, and manage named sessions. Signed-off-by: RafsanNeloy --- cbrain_cli/cli_utils.py | 39 ++--- cbrain_cli/config.py | 4 + cbrain_cli/data/projects.py | 10 +- cbrain_cli/main.py | 36 ++++- cbrain_cli/sessions.py | 282 ++++++++++++++++++------------------ cbrain_cli/users.py | 3 +- 6 files changed, 195 insertions(+), 179 deletions(-) diff --git a/cbrain_cli/cli_utils.py b/cbrain_cli/cli_utils.py index 3ddaf53..e721647 100644 --- a/cbrain_cli/cli_utils.py +++ b/cbrain_cli/cli_utils.py @@ -3,12 +3,11 @@ import re import sys import urllib.error -from pathlib import Path # import importlib.metadata -from cbrain_cli.config import CREDENTIALS_FILE +from cbrain_cli.config import ACTIVE_SESSION_KEY, CREDENTIALS_FILE -# Parse session name from arguments +# Session name priority: --session flag > _active_session in cbrain.json > "default" session_name = "default" session_specified = False for i, arg in enumerate(sys.argv): @@ -20,31 +19,18 @@ session_specified = True try: - # MARK: Credentials. try: with open(CREDENTIALS_FILE) as f: all_credentials = json.load(f) except FileNotFoundError: - # Fallback to older format location - old_file = Path.home() / ".config" / "cbrain" / "credentials.json" - if old_file.exists(): - with open(old_file) as f: - old_creds = json.load(f) - if "cbrain_url" in old_creds: - all_credentials = {"default": old_creds} - else: - all_credentials = old_creds - else: - all_credentials = {} - - if session_name in all_credentials: - credentials = all_credentials[session_name] - elif "cbrain_url" in all_credentials: - # Old format file at current location - credentials = all_credentials if session_name == "default" else {} - all_credentials = {"default": all_credentials} - else: - credentials = {} + all_credentials = {} + + if not session_specified: + session_name = all_credentials.get(ACTIVE_SESSION_KEY, "default") or "default" + + all_credentials.pop(ACTIVE_SESSION_KEY, None) + + credentials = all_credentials.get(session_name, {}) # Get credentials. cbrain_url = credentials.get("cbrain_url") @@ -53,10 +39,7 @@ cbrain_timestamp = credentials.get("timestamp") except Exception: all_credentials = {} - cbrain_url = None - api_token = None - user_id = None - cbrain_timestamp = None + cbrain_url = api_token = user_id = cbrain_timestamp = None def is_authenticated(): diff --git a/cbrain_cli/config.py b/cbrain_cli/config.py index e5339bc..c8e5554 100644 --- a/cbrain_cli/config.py +++ b/cbrain_cli/config.py @@ -13,6 +13,10 @@ SESSION_FILE_DIR.mkdir(parents=True, exist_ok=True) CREDENTIALS_FILE = SESSION_FILE_DIR / SESSION_FILE_NAME +# Key used inside cbrain.json to track the currently active session. +# Prefixed with "_" so it is clearly not a session name. +ACTIVE_SESSION_KEY = "_active_session" + # HTTP headers. DEFAULT_HEADERS = { "Content-Type": "application/x-www-form-urlencoded", diff --git a/cbrain_cli/data/projects.py b/cbrain_cli/data/projects.py index b9166ad..9099e53 100644 --- a/cbrain_cli/data/projects.py +++ b/cbrain_cli/data/projects.py @@ -3,7 +3,8 @@ import urllib.request from cbrain_cli.cli_utils import api_token, cbrain_url -from cbrain_cli.config import CREDENTIALS_FILE, auth_headers +from cbrain_cli.config import auth_headers +from cbrain_cli.sessions import save_credentials def switch_project(args): @@ -63,9 +64,7 @@ def switch_project(args): all_credentials[session_name]["current_group_name"] = group_data.get( "name", "Unknown" ) - - with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials, f, indent=2) + save_credentials(all_credentials) return group_data @@ -131,8 +130,7 @@ def show_project(args): # Clear the invalid group_id from credentials session_creds.pop("current_group_id", None) session_creds.pop("current_group_name", None) - with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials, f, indent=2) + save_credentials(all_credentials) return None else: raise diff --git a/cbrain_cli/main.py b/cbrain_cli/main.py index 7d52b1c..303bb7e 100644 --- a/cbrain_cli/main.py +++ b/cbrain_cli/main.py @@ -39,7 +39,7 @@ handle_tool_list, handle_tool_show, ) -from cbrain_cli.sessions import create_session, logout_session +from cbrain_cli.sessions import create_session, list_sessions, logout_session, switch_session from cbrain_cli.users import whoami_user @@ -76,6 +76,7 @@ def main(): # MARK: Session commands (top-level) # Create new session. login_parser = subparsers.add_parser("login", help="Login to CBRAIN") + login_parser.add_argument("--session", type=str, help="Session name to use") login_parser.add_argument("-u", "--username", type=str, help="CBRAIN username") login_parser.add_argument("-p", "--password", type=str, help="CBRAIN password") login_parser.add_argument("-s", "--server", type=str, help="CBRAIN server URL") @@ -83,13 +84,39 @@ def main(): # Logout session. logout_parser = subparsers.add_parser("logout", help="Logout from CBRAIN") + logout_parser.add_argument( + "--session", type=str, help="Session name to logout (default: all sessions)" + ) logout_parser.set_defaults(func=handle_errors(logout_session)) # Show current session. whoami_parser = subparsers.add_parser("whoami", help="Show current session") + whoami_parser.add_argument("--session", type=str, help="Session name to show") whoami_parser.add_argument("-v", "--version", action="store_true", help="Show version") whoami_parser.set_defaults(func=handle_errors(whoami_user)) + # Switch active session. + switch_session_parser = subparsers.add_parser( + "switch_session", + help="Switch the default session (e.g. cbrain switch_session prod)", + ) + switch_session_parser.add_argument( + "session_target", + type=str, + help="Name of the session to make the default", + ) + switch_session_parser.set_defaults(func=handle_errors(switch_session)) + + # Session management sub-commands. + session_parser = subparsers.add_parser("session", help="Session management") + session_subparsers = session_parser.add_subparsers( + dest="action", help="Session actions" + ) + session_list_parser = session_subparsers.add_parser( + "list", help="List all saved sessions" + ) + session_list_parser.set_defaults(func=handle_errors(list_sessions)) + # MARK: Model-based commands # File commands file_parser = subparsers.add_parser("file", help="File operations") @@ -422,6 +449,13 @@ def main(): return handle_errors(version_info)(args) elif args.command == "whoami": return handle_errors(whoami_user)(args) + elif args.command == "switch_session": + return handle_errors(switch_session)(args) + elif args.command == "session": + if not getattr(args, "action", None): + session_parser.print_help() + return 1 + return args.func(args) # All other commands require authentication. if not is_authenticated(): diff --git a/cbrain_cli/sessions.py b/cbrain_cli/sessions.py index ce17640..0534b40 100644 --- a/cbrain_cli/sessions.py +++ b/cbrain_cli/sessions.py @@ -6,208 +6,204 @@ import urllib.request from cbrain_cli.config import ( + ACTIVE_SESSION_KEY, CREDENTIALS_FILE, DEFAULT_BASE_URL, DEFAULT_HEADERS, auth_headers, ) +## MARK: Internal helpers -# MARK: Create Session. -def create_session(args): - """ - Create a new CBRAIN session by logging in and saving credentials. +def load_credentials() -> dict: + """Load cbrain.json; return {} if missing, raise on corrupt JSON.""" + try: + with open(CREDENTIALS_FILE) as f: + return json.load(f) + except FileNotFoundError: + return {} - Returns - ------- - None - A command is run via inputs from the user. - """ - from cbrain_cli.cli_utils import ( - all_credentials, - api_token, - cbrain_url, - session_name, - ) - if cbrain_url is not None and api_token is not None: +def save_credentials(data: dict) -> None: + """Merge *data* into cbrain.json, preserving metadata keys (e.g. _active_session).""" + on_disk = load_credentials() # always re-read so we don't lose metadata + on_disk.update(data) # overlay the caller's session changes + with open(CREDENTIALS_FILE, "w") as f: + json.dump(on_disk, f, indent=2) + + +def get_sessions(all_creds: dict) -> dict: + """Return only genuine session entries (skip the metadata key).""" + return {name: creds for name, creds in all_creds.items() if name != ACTIVE_SESSION_KEY} + + +# MARK: Switch Session + +def switch_session(args): + """Switch the default session used by bare commands.""" + target = getattr(args, "session_target", None) + if not target: + print("Usage: cbrain switch_session ") + return 1 + + try: + all_creds = load_credentials() + except json.JSONDecodeError: + print(f"Error: credentials file is corrupted ({CREDENTIALS_FILE}).") + return 1 + + sessions = get_sessions(all_creds) + if target not in sessions: + available = ", ".join(sessions) or "(none)" + print(f"Session '{target}' not found. Available sessions: {available}") + return 1 + + all_creds[ACTIVE_SESSION_KEY] = target + save_credentials(all_creds) + print(f"Switched to session '{target}'. All future commands will use this session.") + return 0 + + +# MARK: List Sessions + +def list_sessions(args): + """List all saved sessions, marking the currently active one with '*'.""" + try: + all_creds = load_credentials() + except json.JSONDecodeError: + print(f"Error: credentials file is corrupted ({CREDENTIALS_FILE}).") + return 1 + + active = all_creds.get(ACTIVE_SESSION_KEY, "default") + sessions = get_sessions(all_creds) + + if not sessions: + print("No saved sessions. Use 'cbrain login' to create one.") + return 0 + + print(f"{'#':<4} {'SESSION':<20} {'USERNAME':<16} {'USER ID':<10} {'SERVER':<35} {'TIMESTAMP'}") + print("-" * 90) + for idx, (name, c) in enumerate(sessions.items(), start=1): + marker = "*" if name == active else " " print( - f"Already logged in to session '{session_name}'. " - "Use 'cbrain logout' to logout." + f"{marker}{idx:<3} {name:<20} {c.get('username', '(unknown)'):<16} " + f"{c.get('user_id', 'N/A')!s:<10} {c.get('cbrain_url', 'N/A'):<35} " + f"{c.get('timestamp', 'N/A')}" ) + + print(f"\nActive session: {active} (* = active)") + return 0 + + +# MARK: Create Session + +def create_session(args): + """Login to CBRAIN and save credentials for the current session.""" + from cbrain_cli.cli_utils import all_credentials, api_token, cbrain_url, session_name + + if cbrain_url and api_token: + print(f"Already logged in to session '{session_name}'. Use 'cbrain logout' to logout.") return 1 - # Get user input (support non-interactive flags). - cbrain_url_input = getattr(args, "server", None) or input( + server = getattr(args, "server", None) or input( "Enter CBRAIN server base URL [default: localhost:3000]: " - ).strip() - if not cbrain_url_input: - cbrain_url_input = DEFAULT_BASE_URL + ).strip() or DEFAULT_BASE_URL - username = getattr(args, "username", None) or input( - "Enter CBRAIN username: " - ).strip() + username = getattr(args, "username", None) or input("Enter CBRAIN username: ").strip() if not username: print("Username is required") return 1 - password = getattr(args, "password", None) or getpass.getpass( - "Enter CBRAIN password: " - ) + password = getattr(args, "password", None) or getpass.getpass("Enter CBRAIN password: ") if not password: print("Password is required") return 1 - # Prepare the login request. - login_endpoint = f"{cbrain_url_input}/session" - - # Prepare form data. - form_data = {"login": username, "password": password} - - # Encode the form data. - encoded_data = urllib.parse.urlencode(form_data).encode("utf-8") - - # Create the request. + encoded = urllib.parse.urlencode({"login": username, "password": password}).encode() request = urllib.request.Request( - login_endpoint, data=encoded_data, headers=DEFAULT_HEADERS, method="POST" + f"{server}/session", data=encoded, headers=DEFAULT_HEADERS, method="POST" ) - # Make the request. - with urllib.request.urlopen(request) as response: - data = response.read().decode("utf-8") - response_data = json.loads(data) - - # Extract the API token from response. - cbrain_api_token = response_data.get("cbrain_api_token") - cbrain_user_id = response_data.get("user_id") - - if not cbrain_api_token: + with urllib.request.urlopen(request) as resp: + data = json.loads(resp.read()) + token = data.get("cbrain_api_token") + if not token: print("Login failed: No API token received") return 1 - # Prepare credentials data. - credentials = { - "cbrain_url": cbrain_url_input, - "api_token": cbrain_api_token, - "user_id": cbrain_user_id, + all_credentials[session_name] = { + "cbrain_url": server, + "api_token": token, + "user_id": data.get("user_id"), + "username": username, "timestamp": datetime.datetime.now().isoformat(), } + save_credentials(all_credentials) - # Save credentials to file. - all_credentials[session_name] = credentials - with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials, f, indent=2) - - print( - f"Connection successful, API token saved in " - f"{CREDENTIALS_FILE} for session '{session_name}'" - ) - return 0 + print(f"Connection successful. Token saved in {CREDENTIALS_FILE} for session '{session_name}'.") + return 0 # MARK: Logout + def logout_session(args): """ - Logout from CBRAIN by deleting the session file. + Logout from CBRAIN. - If --session is specified, logout only that session. - If no --session is specified, logout all active sessions. - - Returns - ------- - None - A command is run via inputs from the user. + Without ``--session``: logout all active sessions. + With ``--session ``: logout only that session. """ - from cbrain_cli.cli_utils import ( - all_credentials, - session_name, - session_specified, - ) + from cbrain_cli.cli_utils import session_name, session_specified - if not session_specified and len(all_credentials) > 0: - sessions_to_logout = list(all_credentials.keys()) - else: - sessions_to_logout = [session_name] + # Load a fresh, unstripped copy from disk so _active_session is preserved. + all_creds = load_credentials() + sessions = get_sessions(all_creds) + + sessions_to_logout = list(sessions) if not session_specified else [session_name] if not sessions_to_logout: print("No active sessions to logout.") return 0 for s_name in sessions_to_logout: - creds = all_credentials.get(s_name, {}) - s_url = creds.get("cbrain_url") - s_token = creds.get("api_token") - s_uid = creds.get("user_id") + creds = sessions.get(s_name, {}) + s_url, s_token = creds.get("cbrain_url"), creds.get("api_token") if not s_url or not s_token: - if s_name in all_credentials: - print( - f"Invalid credentials for session '{s_name}'. " - "Removing local session." - ) - del all_credentials[s_name] - else: - if session_specified: - print(f"Not logged in to session '{s_name}'.") - elif len(sessions_to_logout) == 1 and s_name == "default": - print( - "Not logged in. Use 'cbrain login' to login first." - ) + if s_name in sessions: + print(f"Invalid credentials for session '{s_name}'. Removing local session.") + all_creds.pop(s_name, None) + elif session_specified: + print(f"Not logged in to session '{s_name}'.") + elif len(sessions_to_logout) == 1: + print("Not logged in. Use 'cbrain login' to login first.") continue - # Try to fetch username for a nicer logout message - username = s_name + # Use the stored username for the logout message (no extra network call needed). + display_name = creds.get("username", s_name) + try: req = urllib.request.Request( - f"{s_url}/users/{s_uid}", - headers=auth_headers(s_token), - method="GET", + f"{s_url}/session", headers=auth_headers(s_token), method="DELETE" ) - with urllib.request.urlopen(req) as response: - user_data = json.loads(response.read().decode("utf-8")) - username = user_data.get("login", s_name) - except Exception: - pass - - # Prepare logout request. - logout_endpoint = f"{s_url}/session" - - # Create the DELETE request. - request = urllib.request.Request( - logout_endpoint, - data=None, # No payload for DELETE - headers=auth_headers(s_token), - method="DELETE", - ) - - # Make the request to logout from server. - try: - with urllib.request.urlopen(request) as response: - if response.status == 200: - print( - "Successfully logged out from " - f"CBRAIN server as {username}." - ) + with urllib.request.urlopen(req) as resp: + if resp.status == 200: + print(f"Successfully logged out from CBRAIN server as {display_name}.") else: print(f"Logout failed for session '{s_name}'.") except urllib.error.HTTPError as e: - if e.code == 401: - print(f"Session '{s_name}' already expired on server.") - else: - print( - f"Logout request failed for '{s_name}': HTTP {e.code}" - ) + print( + f"Session '{s_name}' already expired on server." + if e.code == 401 + else f"Logout request failed for '{s_name}': HTTP {e.code}" + ) except urllib.error.URLError as e: print(f"Network error during logout for '{s_name}': {e}") - # Always remove local credentials for this session. - if s_name in all_credentials: - del all_credentials[s_name] - print( - f"Local session '{s_name}' removed from {CREDENTIALS_FILE}" - ) + all_creds.pop(s_name, None) + print(f"Local session '{s_name}' removed from {CREDENTIALS_FILE}.") with open(CREDENTIALS_FILE, "w") as f: - json.dump(all_credentials, f, indent=2) - + json.dump(all_creds, f, indent=2) return 0 + diff --git a/cbrain_cli/users.py b/cbrain_cli/users.py index d8bcaaf..b2d67f2 100644 --- a/cbrain_cli/users.py +++ b/cbrain_cli/users.py @@ -79,7 +79,7 @@ def whoami_user(args): else "****" ) - print(f"DEBUG: Found credentials {CREDENTIALS_FILE}") + print(f"DEBUG: Found credentials at {CREDENTIALS_FILE}") print(f"DEBUG: User in credentials: {user_data['login']} on server {cbrain_url}") print(f"DEBUG: Token found: {masked_token}") print("DEBUG: Verifying token...") @@ -121,3 +121,4 @@ def whoami_user(args): return 1 print(f"Current user: {user_data['login']} ({user_data['full_name']}) on server {cbrain_url}") + return 0 From 7da825a3faa0df953ead4c7d8833110870cc0fcb Mon Sep 17 00:00:00 2001 From: RafsanNeloy Date: Sat, 14 Mar 2026 05:34:53 +0600 Subject: [PATCH 4/5] Update session file directory and renamed to `credentials.json`. Signed-off-by: RafsanNeloy --- cbrain_cli/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cbrain_cli/config.py b/cbrain_cli/config.py index c8e5554..6f32008 100644 --- a/cbrain_cli/config.py +++ b/cbrain_cli/config.py @@ -8,12 +8,12 @@ DEFAULT_BASE_URL = "http://localhost:3000" # Session file configuration. -SESSION_FILE_DIR = Path.home() / ".config" -SESSION_FILE_NAME = "cbrain.json" +SESSION_FILE_DIR = Path.home() / ".config" / "cbrain-cli" +SESSION_FILE_NAME = "credentials.json" SESSION_FILE_DIR.mkdir(parents=True, exist_ok=True) CREDENTIALS_FILE = SESSION_FILE_DIR / SESSION_FILE_NAME -# Key used inside cbrain.json to track the currently active session. +# Key used inside credentials.json to track the currently active session. # Prefixed with "_" so it is clearly not a session name. ACTIVE_SESSION_KEY = "_active_session" From ddce7c538e244c639dd8e50ceb26b4c79ede03c0 Mon Sep 17 00:00:00 2001 From: RafsanNeloy Date: Sun, 15 Mar 2026 03:32:33 +0600 Subject: [PATCH 5/5] renamed "cbrain-cli" to "cbrain". Signed-off-by: RafsanNeloy --- cbrain_cli/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cbrain_cli/config.py b/cbrain_cli/config.py index 6f32008..8405d32 100644 --- a/cbrain_cli/config.py +++ b/cbrain_cli/config.py @@ -8,7 +8,7 @@ DEFAULT_BASE_URL = "http://localhost:3000" # Session file configuration. -SESSION_FILE_DIR = Path.home() / ".config" / "cbrain-cli" +SESSION_FILE_DIR = Path.home() / ".config" / "cbrain" SESSION_FILE_NAME = "credentials.json" SESSION_FILE_DIR.mkdir(parents=True, exist_ok=True) CREDENTIALS_FILE = SESSION_FILE_DIR / SESSION_FILE_NAME