From 6be9b5a2d9afbd968c65f668ac8010313d3e8fa2 Mon Sep 17 00:00:00 2001 From: valhayot Date: Mon, 19 Sep 2022 20:51:23 +0000 Subject: [PATCH 01/31] changes to enable margo --- psbench/proxystore.py | 16 ++++++++++++---- psbench/tasks/pong.py | 12 ++++++------ setup.cfg | 1 - 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/psbench/proxystore.py b/psbench/proxystore.py index 871209c..c902ea5 100644 --- a/psbench/proxystore.py +++ b/psbench/proxystore.py @@ -33,14 +33,14 @@ def init_store_from_args( if args.ps_backend == STORES.ENDPOINT.name: store = init_store( STORES.ENDPOINT, - name='endpoint-store', + name="endpoint-store", endpoints=args.ps_endpoints, **kwargs, ) elif args.ps_backend == STORES.FILE.name: store = init_store( STORES.FILE, - name='file-store', + name="file-store", store_dir=args.ps_file_dir, **kwargs, ) @@ -48,17 +48,25 @@ def init_store_from_args( endpoints = GlobusEndpoints.from_json(args.ps_globus_config) store = init_store( STORES.GLOBUS, - name='globus-store', + name="globus-store", endpoints=endpoints, **kwargs, ) elif args.ps_backend == STORES.REDIS.name: store = init_store( STORES.REDIS, - name='redis-store', + name="redis-store", hostname=args.ps_redis_host, port=args.ps_redis_port, **kwargs, ) + elif args.ps_backend == STORES.MARGO.name: + store = init_store( + STORES.MARGO, + name="margo-store", + port=args.ps_redis_port, + **kwargs, + ) + return store diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index ecce21d..eae4c4c 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -62,7 +62,7 @@ def pong_proxy( from proxystore.proxy import Proxy from proxystore.proxy import is_resolved - from proxystore.proxy import resolve_async + from proxystore.store import resolve_async from proxystore.store import get_store from proxystore.store import UnknownStoreError @@ -81,16 +81,16 @@ def pong_proxy( store = get_store(data) if store is None: # pragma: no cover # init_store does not return None in ProxyStore <= 0.3.3 - raise UnknownStoreError('Cannot find ProxyStore backend to use.') + raise UnknownStoreError("Cannot find ProxyStore backend to use.") result: Proxy[bytes] = store.proxy(result_data, evict=evict_result) stats: ProxyStats | None = None if store.has_stats: stats = ProxyStats( - input_get_ms=store.stats(data)['get'].avg_time_ms, - input_resolve_ms=store.stats(data)['resolve'].avg_time_ms, - output_set_ms=store.stats(result)['set'].avg_time_ms, - output_proxy_ms=store.stats(result)['proxy'].avg_time_ms, + input_get_ms=store.stats(data)["get"].avg_time_ms, + input_resolve_ms=store.stats(data)["resolve"].avg_time_ms, + output_set_ms=store.stats(result)["set"].avg_time_ms, + output_proxy_ms=store.stats(result)["proxy"].avg_time_ms, ) return (result, stats) diff --git a/setup.cfg b/setup.cfg index b9ecbe0..b7ad759 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,7 +20,6 @@ packages = find: install_requires = funcx==1.0.* funcx-endpoint==1.0.* - proxystore[endpoints]==0.4.0a2 requests python_requires = >=3.7 include_package_data = True From 1a32670ed54fc89f252bb6f9573fcfc331997c08 Mon Sep 17 00:00:00 2001 From: valhayot Date: Fri, 7 Oct 2022 02:48:57 +0000 Subject: [PATCH 02/31] added intrasite updates to benchmarks --- psbench/argparse.py | 13 +++++++++++++ psbench/proxystore.py | 9 +++++---- psbench/tasks/pong.py | 4 ++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/psbench/argparse.py b/psbench/argparse.py index 7bbc644..bafaf92 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -116,3 +116,16 @@ def add_proxystore_options( required=bool(re.search('--ps-backend( |=)REDIS', args_str)), help='Port of Redis server to use with ProxyStore', ) + group.add_argument( + '--ps-intrasite-interface', + metavar='IFACE', + required=bool(re.search('--ps-backend( |=)INTRASITE', args_str)), + help='Network interface to use with ProxyStore' + ) + group.add_argument( + '--ps-intrasite-port', + metavar='PORT', + type=int, + required=bool(re.search('--ps-backend( |=)INTRASITE', args_str)), + help='Port to use with ProxyStore', + ) diff --git a/psbench/proxystore.py b/psbench/proxystore.py index c902ea5..bf89467 100644 --- a/psbench/proxystore.py +++ b/psbench/proxystore.py @@ -61,11 +61,12 @@ def init_store_from_args( **kwargs, ) - elif args.ps_backend == STORES.MARGO.name: + elif args.ps_backend == STORES.INTRASITE.name: store = init_store( - STORES.MARGO, - name="margo-store", - port=args.ps_redis_port, + STORES.INTRASITE, + name="intrasite-store", + interface=args.ps_intrasite_interface, + port=args.ps_intrasite_port, **kwargs, ) diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index 789bb68..ceedc2c 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -62,7 +62,7 @@ def pong_proxy( from proxystore.proxy import Proxy from proxystore.proxy import is_resolved - from proxystore.store import resolve_async + from proxystore.store.base import StoreFactory from proxystore.store import get_store from proxystore.store import UnknownStoreError @@ -73,7 +73,7 @@ def pong_proxy( assert not is_resolved(data) if sleep > 0.0: - resolve_async(data) + data.resolve_async() time.sleep(sleep) assert isinstance(data, bytes) and isinstance(data, Proxy) From 0df235161cd9826777c3ad9af7fb13e2203e5159 Mon Sep 17 00:00:00 2001 From: valhayot Date: Tue, 21 Mar 2023 21:34:59 +0000 Subject: [PATCH 03/31] Add DataSpaces to funcX pong benchmarks --- dataspaces.conf | 6 ++ psbench/argparse.py | 15 +++++ psbench/benchmarks/funcx_tasks/main.py | 90 +++++++++++++++++++++++++- psbench/tasks/pong.py | 58 +++++++++++++++++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 dataspaces.conf diff --git a/dataspaces.conf b/dataspaces.conf new file mode 100644 index 0000000..0b3dd70 --- /dev/null +++ b/dataspaces.conf @@ -0,0 +1,6 @@ +## Config file for DataSpaces +ndim = 1 +dims = 128 +max_versions = 10 +num_apps = 2 + diff --git a/psbench/argparse.py b/psbench/argparse.py index 94b22c8..3c13a1e 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -53,6 +53,21 @@ def add_ipfs_options(parser: argparse.ArgumentParser) -> None: ) +def add_dspaces_options(parser: argparse.ArgumentParser) -> None: + """Add CLI arguments for DataSpaces. + + Args: + parser (ArgumentParser): parser object to add DataSpaces arguments to. + """ + args_str = ' '.join(sys.argv) + parser.add_argument( + '--dspaces', + action='store_true', + default=False, + help='Use DataSpaces for data transfer.', + ) + + def add_logging_options(parser: argparse.ArgumentParser) -> None: """Add CLI arguments for logging options.""" group = parser.add_argument_group( diff --git a/psbench/benchmarks/funcx_tasks/main.py b/psbench/benchmarks/funcx_tasks/main.py index f13864d..c727421 100644 --- a/psbench/benchmarks/funcx_tasks/main.py +++ b/psbench/benchmarks/funcx_tasks/main.py @@ -22,6 +22,7 @@ from proxystore.store.utils import get_key from psbench import ipfs +from psbench.argparse import add_dspaces_options from psbench.argparse import add_funcx_options from psbench.argparse import add_ipfs_options from psbench.argparse import add_logging_options @@ -31,6 +32,7 @@ from psbench.logging import TESTING_LOG_LEVEL from psbench.proxystore import init_store_from_args from psbench.tasks.pong import pong +from psbench.tasks.pong import pong_dspaces from psbench.tasks.pong import pong_ipfs from psbench.tasks.pong import pong_proxy from psbench.utils import randbytes @@ -153,6 +155,79 @@ def time_task_ipfs( ) +def time_task_dspaces( + *, + fx: funcx.FuncXExecutor, + input_size: int, + output_size: int, + task_sleep: float, +) -> TaskStats: + """Execute and time a single FuncX task with DataSpaces for data transfer. + + Args: + fx (FuncXExecutor): FuncX Executor to submit task through. + input_size (int): number of bytes to send as input to task. + output_size (int): number of bytes task should return. + task_sleep (int): number of seconds to sleep inside task. + + Returns: + TaskStats + """ + import dspaces as ds + import numpy as np + from mpi4py import MPI + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + version = 1 + + client = ds.DSpaces() + path = str(uuid.uuid4()) + + data = randbytes(input_size) + local_size = input_size / size + start = time.perf_counter_ns() + + client.Put(np.array(bytearray(data)), path, version=version, offset=((input_size * rank),)) + fut = fx.submit( + pong_dspaces, + path, + input_size, + rank, + size, + version=version, + result_size=output_size, + sleep=task_sleep, + ) + + result = fut.result() + + if result is not None: + out_path = result[0] + out_size = result[1] + data = client.Get( + out_path, + version, + lb=((out_size * rank),), + ub=((out_size * rank + out_size - 1),), + dtype=bytes, + timeout=-1, + ).tobytes() + + end = time.perf_counter_ns() + assert isinstance(data, bytes) + + return TaskStats( + proxystore_backend='DataSpaces', + task_name='pong', + input_size_bytes=input_size, + output_size_bytes=output_size, + task_sleep_seconds=task_sleep, + total_time_ms=(end - start) / 1e6, + ) + + def time_task_proxy( *, fx: funcx.FuncXExecutor, @@ -216,6 +291,7 @@ def runner( *, funcx_endpoint: str, store: Store | None, + use_dspaces: bool, use_ipfs: bool, ipfs_local_dir: str | None, ipfs_remote_dir: str | None, @@ -232,6 +308,7 @@ def runner( 'Starting test runner\n' f' - FuncX Endpoint: {funcx_endpoint}\n' f' - ProxyStore backend: {store_class_name}\n' + f' - DataSpaces enabled: {use_dspaces}\n' f' - IPFS enabled: {use_ipfs}\n' f' - Task type: ping-pong\n' f' - Task repeat: {task_repeat}\n' @@ -240,9 +317,9 @@ def runner( f' - Task sleep time: {task_sleep} s', ) - if store is not None and use_ipfs: + if store is not None and (use_ipfs or use_dspaces): raise ValueError( - 'IPFS and ProxyStore cannot be used at the same time.', + f'{"IPFS" if use_ipfs else "DataSpaces"} and ProxyStore cannot be used at the same time.', ) runner_start = time.perf_counter_ns() @@ -277,6 +354,13 @@ def runner( output_size=output_size, task_sleep=task_sleep, ) + elif use_dspaces: + stats = time_task_dspaces( + fx=fx, + input_size=input_size, + output_size=output_size, + task_sleep=task_sleep, + ) else: stats = time_task( fx=fx, @@ -356,6 +440,7 @@ def main(argv: Sequence[str] | None = None) -> int: add_logging_options(parser) add_proxystore_options(parser, required=False) add_ipfs_options(parser) + add_dspaces_options(parser) args = parser.parse_args(argv) init_logging(args.log_file, args.log_level, force=True) @@ -365,6 +450,7 @@ def main(argv: Sequence[str] | None = None) -> int: runner( funcx_endpoint=args.funcx_endpoint, store=store, + use_dspaces=args.dspaces, use_ipfs=args.ipfs, ipfs_local_dir=args.ipfs_local_dir, ipfs_remote_dir=args.ipfs_remote_dir, diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index 78fbbf2..ce4dea4 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -71,6 +71,64 @@ def pong_ipfs( return None +def pong_dspaces( + path: str, + data_size: int, + rank: int, + size: int, + *, + version: int = 1, + result_size: int = 0, + sleep: float = 0, +) -> str | None: + """Task that takes a DataSpace path and returns data via DataSpaces. + + Args: + client (ds.DSpaces):DataSpaces client + path (str): filename of the DataSpaces stored data. + data_size (int) : the size of the DataSpaces object. + rank (int) : MPI rank. + size (int): MPI communication size. + version (int): The version of the data to access (default: 1). + result_size (int): size of results byte array (default: 0). + sleep (float): seconds to sleep for to simulate work (default: 0). + + Returns: + Filename of return data or None. + """ + import os + import time + import uuid + import numpy as np + + import dspaces as ds + + from psbench.utils import randbytes + + client = ds.DSpaces() + data = client.Get( + path, + version=version, + lb=((data_size * rank),), + ub=((data_size * rank + data_size - 1),), + dtype=bytes, + timeout=-1, + ).tobytes() + + assert isinstance(data, bytes) + time.sleep(sleep) + + if result_size > 0: + filepath = str(uuid.uuid4()) + return_data = bytearray(randbytes(result_size)) + client.Put( + np.array(return_data), filepath, version=version, offset=((result_size * rank),) + ) + return (filepath, result_size) + else: + return None + + def pong_proxy( data: bytes, *, From 7cdfa743e2e60225aba1fcae285e82aeb623393a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 Mar 2023 23:31:10 +0000 Subject: [PATCH 04/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.257 → v0.0.259](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.257...v0.0.259) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1fc5afb..171fc6b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.257 + rev: v0.0.259 hooks: - id: ruff args: From 76618cc09acd81ee9e275eda44f1061c20d46af7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 4 Apr 2023 01:28:54 +0000 Subject: [PATCH 05/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/psf/black: 23.1.0 → 23.3.0](https://github.com/psf/black/compare/23.1.0...23.3.0) - [github.com/charliermarsh/ruff-pre-commit: v0.0.259 → v0.0.260](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.259...v0.0.260) --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 171fc6b..dfed8b9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,7 +15,7 @@ repos: - id: name-tests-test - id: double-quote-string-fixer - repo: 'https://github.com/psf/black' - rev: 23.1.0 + rev: 23.3.0 hooks: - id: black - repo: 'https://github.com/codespell-project/codespell' @@ -23,7 +23,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.259 + rev: v0.0.260 hooks: - id: ruff args: From 6e671b628ad3c09edb1b4e5b5222bbf30f2dd3f2 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 3 Apr 2023 21:36:37 -0500 Subject: [PATCH 06/31] Fix Flake8-bugbear B018 complaint --- tests/init_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/init_test.py b/tests/init_test.py index 45708aa..b3935a5 100644 --- a/tests/init_test.py +++ b/tests/init_test.py @@ -1,9 +1,8 @@ from __future__ import annotations import psbench -import testing +import testing # noqa: F401 def test_version() -> None: - testing assert isinstance(psbench.__version__, str) From d3ce011c9396793f12c741c7d40df5e66ea6877e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 11 Apr 2023 00:00:29 +0000 Subject: [PATCH 07/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.260 → v0.0.261](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.260...v0.0.261) - [github.com/pre-commit/mirrors-mypy: v1.1.1 → v1.2.0](https://github.com/pre-commit/mirrors-mypy/compare/v1.1.1...v1.2.0) --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dfed8b9..1e3f021 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,13 +23,13 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.260 + rev: v0.0.261 hooks: - id: ruff args: - '--fix' - repo: 'https://github.com/pre-commit/mirrors-mypy' - rev: v1.1.1 + rev: v1.2.0 hooks: - id: mypy additional_dependencies: From 14c67dcc68964304d6b0577fdae63c06862e3617 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 25 Apr 2023 00:07:59 +0000 Subject: [PATCH 08/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.261 → v0.0.262](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.261...v0.0.262) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1e3f021..562c886 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.261 + rev: v0.0.262 hooks: - id: ruff args: From bb076377e037cef7444463c7e18bc80ed6a9fd26 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 May 2023 00:22:22 +0000 Subject: [PATCH 09/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.262 → v0.0.263](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.262...v0.0.263) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 562c886..d203e50 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.262 + rev: v0.0.263 hooks: - id: ruff args: From 41ac09a78d44d4662d1bf1ac4437e44255025ded Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 9 May 2023 00:42:38 +0000 Subject: [PATCH 10/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.263 → v0.0.265](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.263...v0.0.265) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d203e50..a32b041 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.263 + rev: v0.0.265 hooks: - id: ruff args: From 38030cc6ce3b17dc4b869987a182c61e335f0611 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Tue, 2 May 2023 18:39:21 -0500 Subject: [PATCH 11/31] Upgrade most benchmarks to ProxyStore v0.5.0 --- psbench/argparse.py | 5 +- psbench/benchmarks/colmena_rtt/main.py | 2 +- psbench/benchmarks/endpoint_qps/main.py | 27 +++-- psbench/benchmarks/endpoint_qps/routes.py | 47 ++++----- psbench/benchmarks/funcx_tasks/main.py | 19 ++-- psbench/benchmarks/remote_ops/main.py | 14 ++- psbench/proxystore.py | 112 ++++++++------------- psbench/tasks/pong.py | 16 +-- pyproject.toml | 2 +- tests/argparse_test.py | 10 -- tests/benchmarks/endpoint_qps/main_test.py | 4 +- tests/benchmarks/funcx_tasks/main_test.py | 17 ++-- tests/proxystore_test.py | 55 +++++----- tests/tasks/pong_test.py | 9 +- 14 files changed, 156 insertions(+), 183 deletions(-) diff --git a/psbench/argparse.py b/psbench/argparse.py index 3c13a1e..673ecc6 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -121,7 +121,6 @@ def add_proxystore_options( 'GLOBUS', 'REDIS', 'ENDPOINT', - 'WEBSOCKET', 'MARGO', 'UCX', 'ZMQ', @@ -155,7 +154,7 @@ def add_proxystore_options( metavar='HOST', required=bool( re.search( - '--ps-backend( |=)(REDIS|WEBSOCKET|MARGO|UCX|ZMQ)', + '--ps-backend( |=)(REDIS|MARGO|UCX|ZMQ)', args_str, ), ), @@ -167,7 +166,7 @@ def add_proxystore_options( type=int, required=bool( re.search( - '--ps-backend( |=)(REDIS|WEBSOCKET|MARGO|UCX|ZMQ)', + '--ps-backend( |=)(REDIS|MARGO|UCX|ZMQ)', args_str, ), ), diff --git a/psbench/benchmarks/colmena_rtt/main.py b/psbench/benchmarks/colmena_rtt/main.py index eeee8c9..44e7834 100644 --- a/psbench/benchmarks/colmena_rtt/main.py +++ b/psbench/benchmarks/colmena_rtt/main.py @@ -298,7 +298,7 @@ def main(argv: Sequence[str] | None = None) -> int: args = parser.parse_args(argv) init_logging(args.log_file, args.log_level, force=True) - store = init_store_from_args(args, stats=True) + store = init_store_from_args(args, metrics=True) output_dir = os.path.join( args.output_dir, diff --git a/psbench/benchmarks/endpoint_qps/main.py b/psbench/benchmarks/endpoint_qps/main.py index 2ead9ac..c8a28bd 100644 --- a/psbench/benchmarks/endpoint_qps/main.py +++ b/psbench/benchmarks/endpoint_qps/main.py @@ -19,7 +19,7 @@ else: # pragma: <3.8 cover from typing_extensions import Literal -from proxystore.store.endpoint import EndpointStore +from proxystore.connectors.endpoint import EndpointConnector from psbench.argparse import add_logging_options from psbench.benchmarks.endpoint_qps import routes @@ -74,7 +74,7 @@ def run( Returns: RunStats with summary of test run. """ - store = EndpointStore('store', endpoints=[endpoint], cache_size=0) + connector = EndpointConnector([endpoint]) logger.log( TESTING_LOG_LEVEL, @@ -83,15 +83,20 @@ def run( func: Callable[[float], routes.Stats] if route == 'ENDPOINT': - func = functools.partial(routes.endpoint_test, store, sleep, queries) + func = functools.partial( + routes.endpoint_test, + connector, + sleep, + queries, + ) elif route == 'EVICT': - func = functools.partial(routes.evict_test, store, sleep, queries) + func = functools.partial(routes.evict_test, connector, sleep, queries) elif route == 'EXISTS': - func = functools.partial(routes.exists_test, store, sleep, queries) + func = functools.partial(routes.exists_test, connector, sleep, queries) elif route == 'GET': func = functools.partial( routes.get_test, - store, + connector, sleep, queries, payload_size, @@ -99,7 +104,7 @@ def run( elif route == 'SET': func = functools.partial( routes.set_test, - store, + connector, sleep, queries, payload_size, @@ -172,6 +177,8 @@ def run( f'total QPS: {run_stats.qps:.3f}', ) + connector.close() + return run_stats @@ -243,21 +250,21 @@ def main(argv: Sequence[str] | None = None) -> int: '--payload-sizes', type=int, nargs='+', - default=0, + default=[0], help='Payload sizes for GET/SET queries', ) parser.add_argument( '--workers', type=int, nargs='+', - default=1, + default=[1], help='Number of workers (processes) making queries', ) parser.add_argument( '--sleep', type=float, nargs='+', - default=0, + default=[0], help='Sleeps (seconds) between queries', ) parser.add_argument( diff --git a/psbench/benchmarks/endpoint_qps/routes.py b/psbench/benchmarks/endpoint_qps/routes.py index c036c09..b62a5f7 100644 --- a/psbench/benchmarks/endpoint_qps/routes.py +++ b/psbench/benchmarks/endpoint_qps/routes.py @@ -6,7 +6,8 @@ from typing import NamedTuple import requests -from proxystore.store.endpoint import EndpointStore +from proxystore.connectors.endpoint import EndpointConnector +from proxystore.connectors.endpoint import EndpointKey from psbench.utils import randbytes from psbench.utils import wait_until @@ -24,7 +25,7 @@ class Stats(NamedTuple): def endpoint_test( - store: EndpointStore, + connector: EndpointConnector, sleep: float, queries: int, start_time: float | None = None, @@ -32,11 +33,11 @@ def endpoint_test( """Endpoint /endpoint route test. Note: - EndpointStore does not have an interface for querying /endpoint + EndpointConnector does not have an interface for querying /endpoint so we do so manually with the request library. Args: - store (EndpointStore): Store interface to use for querying endpoint. + connector (EndpointConnector): Connector to use for querying endpoint. sleep (float): sleep (seconds) between queries. queries (int): number of queries to make. start_time (float): UNIX timestamp to sleep until for starting test. @@ -53,7 +54,7 @@ def endpoint_test( for _ in range(queries): start = time.perf_counter_ns() response = requests.get( - f'http://{store.endpoint_host}:{store.endpoint_port}/endpoint', + f'http://{connector.endpoint_host}:{connector.endpoint_port}/endpoint', # noqa: E501 ) end = time.perf_counter_ns() latencies.append((end - start) / 1e6) @@ -71,7 +72,7 @@ def endpoint_test( def evict_test( - store: EndpointStore, + connector: EndpointConnector, sleep: float, queries: int, start_time: float | None = None, @@ -79,7 +80,7 @@ def evict_test( """Endpoint /evict route test. Args: - store (EndpointStore): Store interface to use for querying endpoint. + connector (EndpointConnector): Connector to use for querying endpoint. sleep (float): sleep (seconds) between queries. queries (int): number of queries to make. start_time (float): UNIX timestamp to sleep until for starting test. @@ -93,10 +94,10 @@ def evict_test( latencies: list[float] = [] - fake_key = str(uuid.uuid4()) + fake_key = EndpointKey(str(uuid.uuid4()), None) for _ in range(queries): start = time.perf_counter_ns() - store.evict(fake_key) + connector.evict(fake_key) end = time.perf_counter_ns() latencies.append((end - start) / 1e6) time.sleep(sleep) @@ -112,7 +113,7 @@ def evict_test( def exists_test( - store: EndpointStore, + connector: EndpointConnector, sleep: float, queries: int, start_time: float | None = None, @@ -120,7 +121,7 @@ def exists_test( """Endpoint /exists route test. Args: - store (EndpointStore): Store interface to use for querying endpoint. + connector (EndpointConnector): Connector to use for querying endpoint. sleep (float): sleep (seconds) between queries. queries (int): number of queries to make. start_time (float): UNIX timestamp to sleep until for starting test. @@ -134,10 +135,10 @@ def exists_test( latencies: list[float] = [] - fake_key = str(uuid.uuid4()) + fake_key = EndpointKey(str(uuid.uuid4()), None) for _ in range(queries): start = time.perf_counter_ns() - store.exists(fake_key) + connector.exists(fake_key) end = time.perf_counter_ns() latencies.append((end - start) / 1e6) time.sleep(sleep) @@ -153,7 +154,7 @@ def exists_test( def get_test( - store: EndpointStore, + connector: EndpointConnector, sleep: float, queries: int, payload_size: int, @@ -162,7 +163,7 @@ def get_test( """Endpoint /get route test. Args: - store (EndpointStore): Store interface to use for querying endpoint. + connector (EndpointConnector): Connector to use for querying endpoint. sleep (float): sleep (seconds) between queries. queries (int): number of queries to make. payload_size (int): payload size in bytes. @@ -177,17 +178,17 @@ def get_test( latencies: list[float] = [] - key = store.set(randbytes(payload_size)) + key = connector.put(randbytes(payload_size)) for _ in range(queries): start = time.perf_counter_ns() - res = store.get(key) + res = connector.get(key) end = time.perf_counter_ns() latencies.append((end - start) / 1e6) assert res is not None time.sleep(sleep) - store.evict(key) + connector.evict(key) return Stats( queries=queries, @@ -200,7 +201,7 @@ def get_test( def set_test( - store: EndpointStore, + connector: EndpointConnector, sleep: float, queries: int, payload_size: int, @@ -214,7 +215,7 @@ def set_test( are included in the timing. Args: - store (EndpointStore): Store interface to use for querying endpoint. + connector (EndpointConnector): Connector to use for querying endpoint. sleep (float): sleep (seconds) between queries. queries (int): number of queries to make. payload_size (int): payload size in bytes. @@ -232,12 +233,12 @@ def set_test( data = randbytes(payload_size) for _ in range(queries): - # Note we do set/evict here to keep store memory in check + # Note we do put/evict here to keep connector memory in check start = time.perf_counter_ns() - key = store.set(data) + key = connector.put(data) end = time.perf_counter_ns() latencies.append((end - start) / 1e6) - store.evict(key) + connector.evict(key) time.sleep(sleep) return Stats( diff --git a/psbench/benchmarks/funcx_tasks/main.py b/psbench/benchmarks/funcx_tasks/main.py index c727421..ea0c6ad 100644 --- a/psbench/benchmarks/funcx_tasks/main.py +++ b/psbench/benchmarks/funcx_tasks/main.py @@ -51,11 +51,11 @@ class TaskStats: task_sleep_seconds: float total_time_ms: float input_get_ms: float | None = None - input_set_ms: float | None = None + input_put_ms: float | None = None input_proxy_ms: float | None = None input_resolve_ms: float | None = None output_get_ms: float | None = None - output_set_ms: float | None = None + output_put_ms: float | None = None output_proxy_ms: float | None = None output_resolve_ms: float | None = None @@ -269,21 +269,24 @@ def time_task_proxy( assert isinstance(result, bytes) assert isinstance(result, Proxy) + input_metrics = store.metrics.get_metrics(proxy) + output_metrics = store.metrics.get_metrics(result) + return TaskStats( - proxystore_backend=store.__class__.__name__, + proxystore_backend=store.connector.__class__.__name__, task_name='pong', input_size_bytes=input_size, output_size_bytes=output_size, task_sleep_seconds=task_sleep, total_time_ms=(end - start) / 1e6, input_get_ms=task_proxy_stats.input_get_ms, - input_set_ms=store.stats(proxy)['set'].avg_time_ms, - input_proxy_ms=store.stats(proxy)['proxy'].avg_time_ms, + input_put_ms=input_metrics.times['store.put'].avg_time_ms, + input_proxy_ms=input_metrics.times['store.proxy'].avg_time_ms, input_resolve_ms=task_proxy_stats.input_resolve_ms, - output_get_ms=store.stats(result)['get'].avg_time_ms, - output_set_ms=task_proxy_stats.output_set_ms, + output_get_ms=output_metrics.times['store.get'].avg_time_ms, + output_put_ms=task_proxy_stats.output_put_ms, output_proxy_ms=task_proxy_stats.output_proxy_ms, - output_resolve_ms=store.stats(result)['resolve'].avg_time_ms, + output_resolve_ms=output_metrics.times['factory.resolve'].avg_time_ms, ) diff --git a/psbench/benchmarks/remote_ops/main.py b/psbench/benchmarks/remote_ops/main.py index a37e0ee..1582fdd 100644 --- a/psbench/benchmarks/remote_ops/main.py +++ b/psbench/benchmarks/remote_ops/main.py @@ -9,6 +9,7 @@ import asyncio import logging import socket +import statistics import sys import uuid from typing import Any @@ -47,6 +48,7 @@ class RunStats(NamedTuple): avg_time_ms: float min_time_ms: float max_time_ms: float + stdev_time_ms: float avg_bandwidth_mbps: float | None @@ -121,6 +123,9 @@ async def run_endpoint( avg_time_ms=sum(times_ms) / len(times_ms), min_time_ms=min(times_ms), max_time_ms=max(times_ms), + stdev_time_ms=( + statistics.stdev(times_ms) if len(times_ms) > 1 else 0.0 + ), avg_bandwidth_mbps=avg_bandwidth_mbps, ) @@ -176,6 +181,9 @@ def run_redis( avg_time_ms=sum(times_ms) / len(times_ms), min_time_ms=min(times_ms), max_time_ms=max(times_ms), + stdev_time_ms=( + statistics.stdev(times_ms) if len(times_ms) > 1 else 0.0 + ), avg_bandwidth_mbps=avg_bandwidth_mbps, ) @@ -196,7 +204,7 @@ async def runner_endpoint( ops (str): endpoint operations to test. payload_sizes (int): bytes to send/receive for GET/SET operations. repeat (int): number of times to repeat operations. - server (str): signaling server address + server (str): relay server address csv_file (str): optional csv filepath to log results to. """ if csv_file is not None: @@ -205,7 +213,7 @@ async def runner_endpoint( async with Endpoint( name=socket.gethostname(), uuid=uuid.uuid4(), - signaling_server=server, + relay_server=server, ) as endpoint: for op in ops: for i, payload_size in enumerate(payload_sizes): @@ -316,7 +324,7 @@ def main(argv: Sequence[str] | None = None) -> int: parser.add_argument( '--server', required='ENDPOINT' in sys.argv, - help='Signaling server address for connecting to the remote endpoint', + help='Relay server address for connecting to the remote endpoint', ) parser.add_argument( '--repeat', diff --git a/psbench/proxystore.py b/psbench/proxystore.py index 300e025..9c889ea 100644 --- a/psbench/proxystore.py +++ b/psbench/proxystore.py @@ -3,23 +3,23 @@ import argparse from typing import Any +from proxystore.connectors.connector import Connector +from proxystore.connectors.dim.margo import MargoConnector +from proxystore.connectors.dim.ucx import UCXConnector +from proxystore.connectors.dim.zmq import ZeroMQConnector +from proxystore.connectors.endpoint import EndpointConnector +from proxystore.connectors.file import FileConnector +from proxystore.connectors.globus import GlobusConnector +from proxystore.connectors.globus import GlobusEndpoints +from proxystore.connectors.redis import RedisConnector from proxystore.store import register_store -from proxystore.store.base import Store -from proxystore.store.dim.margo import MargoStore -from proxystore.store.dim.ucx import UCXStore -from proxystore.store.dim.websockets import WebsocketStore -from proxystore.store.dim.zmq import ZeroMQStore -from proxystore.store.endpoint import EndpointStore -from proxystore.store.file import FileStore -from proxystore.store.globus import GlobusEndpoints -from proxystore.store.globus import GlobusStore -from proxystore.store.redis import RedisStore +from proxystore.store import Store def init_store_from_args( args: argparse.Namespace, **kwargs: Any, -) -> Store | None: +) -> Store[Any] | None: """Initialize a ProxyStore Store from CLI arguments. Usage: @@ -30,71 +30,39 @@ def init_store_from_args( Args: args (Namespace): namespace returned by argument parser. - kwargs: additional keyword arguments to pass to store constructor. + kwargs: additional keyword arguments to pass to the Store. Returns: Store or None if no store was specified. """ - store: Store | None = None + if not args.ps_backend: + return None - if args.ps_backend: - if args.ps_backend == 'ENDPOINT': - store = EndpointStore( - name='endpoint-store', - endpoints=args.ps_endpoints, - **kwargs, - ) - elif args.ps_backend == 'FILE': - store = FileStore( - name='file-store', - store_dir=args.ps_file_dir, - **kwargs, - ) - elif args.ps_backend == 'GLOBUS': - endpoints = GlobusEndpoints.from_json(args.ps_globus_config) - store = GlobusStore( - name='globus-store', - endpoints=endpoints, - **kwargs, - ) - elif args.ps_backend == 'REDIS': - store = RedisStore( - name='redis-store', - hostname=args.ps_host, - port=args.ps_port, - **kwargs, - ) - elif args.ps_backend == 'WEBSOCKET': - store = WebsocketStore( - name='websocket-store', - interface=args.ps_host, - port=args.ps_port, - **kwargs, - ) - elif args.ps_backend == 'MARGO': - store = MargoStore( - name='margo-store', - interface=args.ps_host, - port=args.ps_port, - protocol=args.ps_margo_protocol, - **kwargs, - ) - elif args.ps_backend == 'UCX': - store = UCXStore( - name='ucx-store', - interface=args.ps_host, - port=args.ps_port, - **kwargs, - ) - elif args.ps_backend == 'ZMQ': - store = ZeroMQStore( - name='zmq-store', - interface=args.ps_host, - port=args.ps_port, - **kwargs, - ) - else: - raise ValueError(f'Invalid backend: {args.ps_backend}') - register_store(store) + connector: Connector + + if args.ps_backend == 'ENDPOINT': + connector = EndpointConnector(args.ps_endpoints) + elif args.ps_backend == 'FILE': + connector = FileConnector(args.ps_file_dir) + elif args.ps_backend == 'GLOBUS': + endpoints = GlobusEndpoints.from_json(args.ps_globus_config) + connector = GlobusConnector(endpoints) + elif args.ps_backend == 'REDIS': + connector = RedisConnector(args.ps_host, args.ps_port) + elif args.ps_backend == 'MARGO': + connector = MargoConnector( + interface=args.ps_host, + port=args.ps_port, + protocol=args.ps_margo_protocol, + ) + elif args.ps_backend == 'UCX': + connector = UCXConnector(interface=args.ps_host, port=args.ps_port) + elif args.ps_backend == 'ZMQ': + connector = ZeroMQConnector(interface=args.ps_host, port=args.ps_port) + else: + raise ValueError(f'Invalid backend: {args.ps_backend}') + + store = Store(f'{args.ps_backend}-STORE', connector, **kwargs) + register_store(store) return store diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index ce4dea4..ca51e0a 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -8,7 +8,7 @@ class ProxyStats(NamedTuple): input_get_ms: float | None = None input_resolve_ms: float | None = None - output_set_ms: float | None = None + output_put_ms: float | None = None output_proxy_ms: float | None = None @@ -181,12 +181,16 @@ def pong_proxy( result: Proxy[bytes] = store.proxy(result_data, evict=evict_result) stats: ProxyStats | None = None - if store.has_stats: + if store.metrics is not None: + input_metrics = store.metrics.get_metrics(data) + output_metrics = store.metrics.get_metrics(result) stats = ProxyStats( - input_get_ms=store.stats(data)['get'].avg_time_ms, - input_resolve_ms=store.stats(data)['resolve'].avg_time_ms, - output_set_ms=store.stats(result)['set'].avg_time_ms, - output_proxy_ms=store.stats(result)['proxy'].avg_time_ms, + input_get_ms=input_metrics.times['store.get'].avg_time_ms, + input_resolve_ms=input_metrics.times[ + 'factory.resolve' + ].avg_time_ms, + output_put_ms=output_metrics.times['store.put'].avg_time_ms, + output_proxy_ms=output_metrics.times['store.proxy'].avg_time_ms, ) return (result, stats) diff --git a/pyproject.toml b/pyproject.toml index 1832d24..26097d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ dependencies = [ "colmena[funcx]==0.4.2", "funcx==1.0.8", "funcx-endpoint==1.0.8", - "proxystore[endpoints]==0.4.1", + "proxystore[all]==0.5.0", "redis==3.4.1", "requests==2.28.2", "importlib-metadata; python_version<'3.8'", diff --git a/tests/argparse_test.py b/tests/argparse_test.py index 6b2deed..b95192f 100644 --- a/tests/argparse_test.py +++ b/tests/argparse_test.py @@ -59,16 +59,6 @@ def test_add_proxystore_options() -> None: '1234', ], ) - parser.parse_args( - [ - '--ps-backend', - 'WEBSOCKET', - '--ps-host', - 'localhost', - '--ps-port', - '1234', - ], - ) parser.parse_args( [ '--ps-backend', diff --git a/tests/benchmarks/endpoint_qps/main_test.py b/tests/benchmarks/endpoint_qps/main_test.py index a6ce736..56434d6 100644 --- a/tests/benchmarks/endpoint_qps/main_test.py +++ b/tests/benchmarks/endpoint_qps/main_test.py @@ -34,14 +34,14 @@ def test_run( route: Literal['GET', 'SET', 'EXISTS', 'EVICT', 'ENDPOINT'], ) -> None: with mock.patch( - 'psbench.benchmarks.endpoint_qps.main.EndpointStore', + 'psbench.benchmarks.endpoint_qps.main.EndpointConnector', ), mock.patch('multiprocessing.pool.Pool.apply_async', new=call_directly): run('UUID', route, payload_size=1, queries=2, sleep=0, workers=2) def test_runner() -> None: with mock.patch( - 'psbench.benchmarks.endpoint_qps.main.EndpointStore', + 'psbench.benchmarks.endpoint_qps.main.EndpointConnector', ), mock.patch('multiprocessing.pool.Pool.apply_async', new=call_directly): runner( 'UUID', diff --git a/tests/benchmarks/funcx_tasks/main_test.py b/tests/benchmarks/funcx_tasks/main_test.py index 3ae8c41..336d0c1 100644 --- a/tests/benchmarks/funcx_tasks/main_test.py +++ b/tests/benchmarks/funcx_tasks/main_test.py @@ -7,9 +7,10 @@ from unittest import mock import pytest +from proxystore.connectors.local import LocalConnector from proxystore.store import register_store +from proxystore.store import Store from proxystore.store import unregister_store -from proxystore.store.local import LocalStore from psbench.benchmarks.funcx_tasks.main import main from psbench.benchmarks.funcx_tasks.main import runner @@ -72,7 +73,7 @@ def test_time_task_ipfs(tmp_path: pathlib.Path) -> None: def test_time_task_proxy() -> None: fx = mock_executor() - store = LocalStore(name='test-time-task-store', stats=True) + store = Store('test-time-task-store', LocalConnector(), metrics=True) register_store(store) stats = time_task_proxy( @@ -83,17 +84,17 @@ def test_time_task_proxy() -> None: task_sleep=0.01, ) - assert stats.proxystore_backend == 'LocalStore' + assert stats.proxystore_backend == 'LocalConnector' assert stats.input_size_bytes == 100 assert stats.output_size_bytes == 50 assert stats.task_sleep_seconds == 0.01 assert stats.total_time_ms >= 10 assert stats.input_get_ms is not None and stats.input_get_ms > 0 - assert stats.input_set_ms is not None and stats.input_set_ms > 0 + assert stats.input_put_ms is not None and stats.input_put_ms > 0 assert stats.input_proxy_ms is not None and stats.input_proxy_ms > 0 assert stats.input_resolve_ms is not None and stats.input_resolve_ms > 0 assert stats.output_get_ms is not None and stats.output_get_ms > 0 - assert stats.output_set_ms is not None and stats.output_set_ms > 0 + assert stats.output_put_ms is not None and stats.output_put_ms > 0 assert stats.output_proxy_ms is not None and stats.output_proxy_ms > 0 assert stats.output_resolve_ms is not None and stats.output_resolve_ms > 0 unregister_store(store) @@ -113,7 +114,7 @@ def test_runner( caplog.set_level(logging.ERROR) if use_proxystore: - store = LocalStore(name='test-runner-store', stats=True) + store = Store('test-runner-store', LocalConnector(), metrics=True) register_store(store) else: store = None @@ -156,7 +157,7 @@ def test_runner( def test_runner_error() -> None: - with LocalStore(name='test-runner-store') as store: + with Store('test-runner-store', LocalConnector()) as store: with pytest.raises(ValueError): runner( funcx_endpoint=str(uuid.uuid4()), @@ -188,7 +189,7 @@ def test_main(mock_runner) -> None: with mock.patch( 'psbench.benchmarks.funcx_tasks.main.init_store_from_args', - return_value=LocalStore('test-main-store'), + return_value=Store('test-main-store', LocalConnector()), ): main( [ diff --git a/tests/proxystore_test.py b/tests/proxystore_test.py index 5bb7b94..6074ff9 100644 --- a/tests/proxystore_test.py +++ b/tests/proxystore_test.py @@ -5,15 +5,14 @@ from unittest import mock import pytest -from proxystore.store.base import Store -from proxystore.store.dim.margo import MargoStore -from proxystore.store.dim.ucx import UCXStore -from proxystore.store.dim.websockets import WebsocketStore -from proxystore.store.dim.zmq import ZeroMQStore -from proxystore.store.endpoint import EndpointStore -from proxystore.store.file import FileStore -from proxystore.store.globus import GlobusStore -from proxystore.store.redis import RedisStore +from proxystore.connectors.connector import Connector +from proxystore.connectors.dim.margo import MargoConnector +from proxystore.connectors.dim.ucx import UCXConnector +from proxystore.connectors.dim.zmq import ZeroMQConnector +from proxystore.connectors.endpoint import EndpointConnector +from proxystore.connectors.file import FileConnector +from proxystore.connectors.globus import GlobusConnector +from proxystore.connectors.redis import RedisConnector from psbench.proxystore import init_store_from_args @@ -21,37 +20,32 @@ @pytest.mark.parametrize( 'backend,backend_type,kwargs', ( - ('ENDPOINT', EndpointStore, {'ps_endpoints': ['abcd']}), - ('FILE', FileStore, {'ps_file_dir': '/tmp/file'}), - ('GLOBUS', GlobusStore, {'ps_globus_config': '/tmp/file'}), + ('ENDPOINT', EndpointConnector, {'ps_endpoints': ['abcd']}), + ('FILE', FileConnector, {'ps_file_dir': '/tmp/file'}), + ('GLOBUS', GlobusConnector, {'ps_globus_config': '/tmp/file'}), ( 'REDIS', - RedisStore, - {'ps_host': 'localhost', 'ps_port': 1234}, - ), - ( - 'WEBSOCKET', - WebsocketStore, + RedisConnector, {'ps_host': 'localhost', 'ps_port': 1234}, ), ( 'MARGO', - MargoStore, + MargoConnector, { 'ps_host': 'localhost', 'ps_port': 1234, 'ps_margo_protocol': 'tcp', }, ), - ('UCX', UCXStore, {'ps_host': 'localhost', 'ps_port': 1234}), - ('ZMQ', ZeroMQStore, {'ps_host': 'localhost', 'ps_port': 1234}), + ('UCX', UCXConnector, {'ps_host': 'localhost', 'ps_port': 1234}), + ('ZMQ', ZeroMQConnector, {'ps_host': 'localhost', 'ps_port': 1234}), (None, None, {}), ('INVALID_BACKEND', None, {}), ), ) def test_store_from_args( backend: str | None, - backend_type: type[Store] | None, + backend_type: type[Connector] | None, kwargs: dict[str, Any], ) -> None: args = argparse.Namespace() @@ -60,28 +54,25 @@ def test_store_from_args( setattr(args, key, value) with mock.patch('psbench.proxystore.register_store'), mock.patch( - 'psbench.proxystore.FileStore', - ), mock.patch('psbench.proxystore.RedisStore'), mock.patch( - 'psbench.proxystore.EndpointStore', + 'psbench.proxystore.FileConnector', + ), mock.patch('psbench.proxystore.RedisConnector'), mock.patch( + 'psbench.proxystore.EndpointConnector', ), mock.patch( - 'psbench.proxystore.GlobusStore', + 'psbench.proxystore.GlobusConnector', ), mock.patch( 'psbench.proxystore.GlobusEndpoints.from_json', ), mock.patch( - 'psbench.proxystore.WebsocketStore', - ), mock.patch( - 'psbench.proxystore.MargoStore', + 'psbench.proxystore.MargoConnector', ), mock.patch( - 'psbench.proxystore.UCXStore', + 'psbench.proxystore.UCXConnector', ), mock.patch( - 'psbench.proxystore.ZeroMQStore', + 'psbench.proxystore.ZeroMQConnector', ): if backend in [ 'ENDPOINT', 'FILE', 'GLOBUS', 'REDIS', - 'WEBSOCKET', 'MARGO', 'UCX', 'ZMQ', diff --git a/tests/tasks/pong_test.py b/tests/tasks/pong_test.py index a5f79b5..b5337bf 100644 --- a/tests/tasks/pong_test.py +++ b/tests/tasks/pong_test.py @@ -4,10 +4,11 @@ import time import pytest +from proxystore.connectors.local import LocalConnector from proxystore.proxy import Proxy from proxystore.store import register_store +from proxystore.store import Store from proxystore.store import unregister_store -from proxystore.store.local import LocalStore from psbench import ipfs from psbench.tasks.pong import pong @@ -42,7 +43,7 @@ def test_pong_ipfs(tmp_path: pathlib.Path): def test_pong_proxy() -> None: - store = LocalStore(name='pong-proxy-store') + store = Store('pong-proxy-stats-store', LocalConnector()) register_store(store) input_data: Proxy[bytes] = store.proxy(b'abcd') @@ -58,14 +59,14 @@ def test_pong_proxy() -> None: def test_pong_proxy_stats() -> None: - store = LocalStore(name='pong-proxy-stats-store', stats=True) + store = Store('pong-proxy-stats-store', LocalConnector(), metrics=True) register_store(store) input_data: Proxy[bytes] = store.proxy(b'abcd') _, stats = pong_proxy(input_data, result_size=10) assert stats is not None assert stats.input_get_ms is not None and stats.input_get_ms > 0 assert stats.input_resolve_ms is not None and stats.input_resolve_ms > 0 - assert stats.output_set_ms is not None and stats.output_set_ms > 0 + assert stats.output_put_ms is not None and stats.output_put_ms > 0 assert stats.output_proxy_ms is not None and stats.output_proxy_ms > 0 unregister_store(store) From 196dcc4b6f8c2b59a9739c6f4cc25dca07112d44 Mon Sep 17 00:00:00 2001 From: valhayot Date: Fri, 12 May 2023 21:20:34 -0400 Subject: [PATCH 12/31] Add options to pass optional address and interface to DIMs --- psbench/argparse.py | 14 +++++++++++++- psbench/benchmarks/funcx_tasks/main.py | 2 +- psbench/proxystore.py | 7 ++++--- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/psbench/argparse.py b/psbench/argparse.py index 673ecc6..19b5910 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -154,7 +154,7 @@ def add_proxystore_options( metavar='HOST', required=bool( re.search( - '--ps-backend( |=)(REDIS|MARGO|UCX|ZMQ)', + '--ps-backend( |=)(REDIS)', args_str, ), ), @@ -178,3 +178,15 @@ def add_proxystore_options( help='Optionally specify the Margo protocol to use with ProxyStore', default='tcp', ) + group.add_argument( + '--ps-address', + metavar='ADDRESS', + default=None, + help='Optional specify host IP address that can be used by the DIMs' + ) + group.add_argument( + '--ps-interface', + metavar='INTERFACE', + default=None, + help='Optionally provide interface name to be used by the DIMs' + ) diff --git a/psbench/benchmarks/funcx_tasks/main.py b/psbench/benchmarks/funcx_tasks/main.py index ea0c6ad..5c940ef 100644 --- a/psbench/benchmarks/funcx_tasks/main.py +++ b/psbench/benchmarks/funcx_tasks/main.py @@ -448,7 +448,7 @@ def main(argv: Sequence[str] | None = None) -> int: init_logging(args.log_file, args.log_level, force=True) - store = init_store_from_args(args, stats=True) + store = init_store_from_args(args, metrics=True) runner( funcx_endpoint=args.funcx_endpoint, diff --git a/psbench/proxystore.py b/psbench/proxystore.py index 9c889ea..88a8c34 100644 --- a/psbench/proxystore.py +++ b/psbench/proxystore.py @@ -51,14 +51,15 @@ def init_store_from_args( connector = RedisConnector(args.ps_host, args.ps_port) elif args.ps_backend == 'MARGO': connector = MargoConnector( - interface=args.ps_host, port=args.ps_port, protocol=args.ps_margo_protocol, + address=args.ps_address, + interface=args.ps_interface, ) elif args.ps_backend == 'UCX': - connector = UCXConnector(interface=args.ps_host, port=args.ps_port) + connector = UCXConnector(port=args.ps_port, interface=args.ps_interface, address=args.ps_address) elif args.ps_backend == 'ZMQ': - connector = ZeroMQConnector(interface=args.ps_host, port=args.ps_port) + connector = ZeroMQConnector(port=args.ps_port, interface=args.ps_interface, address=args.ps_address) else: raise ValueError(f'Invalid backend: {args.ps_backend}') From 16f451548dc1a7617ba41a37ca858204a7fe0e80 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Fri, 12 May 2023 21:27:22 -0500 Subject: [PATCH 13/31] Migrate funcx-tasks to globus-compute-tasks --- ...funcx-tasks.md => globus-compute-tasks.md} | 21 +++--- psbench/argparse.py | 15 ++-- .../__init__.py | 0 .../__main__.py | 2 +- .../main.py | 69 ++++++++++--------- psbench/proxystore.py | 12 +++- pyproject.toml | 6 +- testing/{funcx.py => globus_compute.py} | 32 ++++----- tests/argparse_test.py | 12 ++-- .../__init__.py | 0 .../main_test.py | 42 +++++------ 11 files changed, 110 insertions(+), 101 deletions(-) rename docs/{funcx-tasks.md => globus-compute-tasks.md} (62%) rename psbench/benchmarks/{funcx_tasks => globus_compute_tasks}/__init__.py (100%) rename psbench/benchmarks/{funcx_tasks => globus_compute_tasks}/__main__.py (60%) rename psbench/benchmarks/{funcx_tasks => globus_compute_tasks}/main.py (88%) rename testing/{funcx.py => globus_compute.py} (57%) rename tests/benchmarks/{funcx_tasks => globus_compute_tasks}/__init__.py (100%) rename tests/benchmarks/{funcx_tasks => globus_compute_tasks}/main_test.py (83%) diff --git a/docs/funcx-tasks.md b/docs/globus-compute-tasks.md similarity index 62% rename from docs/funcx-tasks.md rename to docs/globus-compute-tasks.md index 3893ea3..f17ddc3 100644 --- a/docs/funcx-tasks.md +++ b/docs/globus-compute-tasks.md @@ -1,6 +1,9 @@ -# Simple FuncX + ProxyStore Test +# Simple Globus Compute + ProxyStore Test -This benchmark executes and times a series of simple FuncX tasks +**Note:** Globus Compute was formerly called funcX. Read about the change +[here](https://globus-compute.readthedocs.io/en/latest/funcx_upgrade.html). + +This benchmark executes and times a series of simple Globus Compute tasks that take arbitrarily sized byte array inputs and return arbitrarily sized byte arrays. The input/output sizes and ProxyStore backend can be specified. @@ -16,10 +19,10 @@ specified. ``` $ pip install . ``` -3. Create a FuncX endpoint. +3. Create a Globus Compute endpoint. ``` - $ funcx-endpoint configure psbench - $ funcx-endpoint start psbench + $ globus-compute-endpoint configure psbench + $ globus-compute-endpoint start psbench ``` The returned endpoint UUID will be needed in the next step. @@ -30,16 +33,16 @@ Here's an example of a minimal working example that uses the ProxyStore file backend. ``` -$ python -m psbench.benchmarks.funcx_tasks \ - --funcx-endpoint {UUID} \ # UUID returned by funcx-endpoint start +$ python -m psbench.benchmarks.globus_compute_tasks \ + --globus-compute-endpoint {ENDPOINT_UUID} \ --input-sizes 100 1000 10000 \ --output-sizes 100 1000 10000 \ --ps-backend FILE --ps-file-dir /tmp/proxystore-dump ``` Omitting `--ps-backend` will result in data being passed directly via -FuncX. `--input-sizes` and `--output-sizes` take a list of options and will -result in a matrix of tasks being run. Individual task configurations can +Globus Compute. `--input-sizes` and `--output-sizes` take a list of options and +will result in a matrix of tasks being run. Individual task configurations can be repeated *n* times with the `--task-repeat` parameter. A sleep can be added to tasks with `--task-sleep`. Task timing stats can be saved to a CSV file with `--csv-file PATH` (this will append to existing files as well). diff --git a/psbench/argparse.py b/psbench/argparse.py index 19b5910..ed15b71 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -5,15 +5,16 @@ import sys -def add_funcx_options( +def add_globus_compute_options( parser: argparse.ArgumentParser, required: bool = False, ) -> None: - """Add CLI arguments for FuncX. + """Add CLI arguments for Globus Compute. Args: - parser (ArgumentParser): parser object to add FuncX arguments to. - required (bool): require the FuncX endpoint to be specified + parser (ArgumentParser): parser object to add Globus Compute arguments + to. + required (bool): require the Globus Compute endpoint to be specified (default: False). """ group = parser.add_argument_group( @@ -21,7 +22,7 @@ def add_funcx_options( description='FuncX Endpoint configuration', ) group.add_argument( - '--funcx-endpoint', + '--globus-compute-endpoint', metavar='UUID', required=required, help='FuncX endpoint for task execution', @@ -182,11 +183,11 @@ def add_proxystore_options( '--ps-address', metavar='ADDRESS', default=None, - help='Optional specify host IP address that can be used by the DIMs' + help='Optional specify host IP address that can be used by the DIMs', ) group.add_argument( '--ps-interface', metavar='INTERFACE', default=None, - help='Optionally provide interface name to be used by the DIMs' + help='Optionally provide interface name to be used by the DIMs', ) diff --git a/psbench/benchmarks/funcx_tasks/__init__.py b/psbench/benchmarks/globus_compute_tasks/__init__.py similarity index 100% rename from psbench/benchmarks/funcx_tasks/__init__.py rename to psbench/benchmarks/globus_compute_tasks/__init__.py diff --git a/psbench/benchmarks/funcx_tasks/__main__.py b/psbench/benchmarks/globus_compute_tasks/__main__.py similarity index 60% rename from psbench/benchmarks/funcx_tasks/__main__.py rename to psbench/benchmarks/globus_compute_tasks/__main__.py index 6af67a7..ef65a1f 100644 --- a/psbench/benchmarks/funcx_tasks/__main__.py +++ b/psbench/benchmarks/globus_compute_tasks/__main__.py @@ -1,6 +1,6 @@ from __future__ import annotations -from psbench.benchmarks.funcx_tasks.main import main +from psbench.benchmarks.globus_compute_tasks.main import main if __name__ == '__main__': raise SystemExit(main()) diff --git a/psbench/benchmarks/funcx_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py similarity index 88% rename from psbench/benchmarks/funcx_tasks/main.py rename to psbench/benchmarks/globus_compute_tasks/main.py index 5c940ef..7a928ea 100644 --- a/psbench/benchmarks/funcx_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -1,6 +1,6 @@ -"""FuncX + ProxyStore Simple Test. +"""Globus Compute + ProxyStore Simple Test. -Tests round trip function execution times to a FuncX endpoint with +Tests round trip function execution times to a Globus Compute endpoint with configurable function payload transfer methods, sizes, etc. """ from __future__ import annotations @@ -15,7 +15,7 @@ import uuid from typing import Sequence -import funcx +import globus_compute_sdk import proxystore from proxystore.proxy import Proxy from proxystore.store.base import Store @@ -23,7 +23,7 @@ from psbench import ipfs from psbench.argparse import add_dspaces_options -from psbench.argparse import add_funcx_options +from psbench.argparse import add_globus_compute_options from psbench.argparse import add_ipfs_options from psbench.argparse import add_logging_options from psbench.argparse import add_proxystore_options @@ -37,7 +37,7 @@ from psbench.tasks.pong import pong_proxy from psbench.utils import randbytes -logger = logging.getLogger('funcx-test') +logger = logging.getLogger('globus-compute-test') @dataclasses.dataclass @@ -62,15 +62,15 @@ class TaskStats: def time_task( *, - fx: funcx.FuncXExecutor, + gce: globus_compute_sdk.Executor, input_size: int, output_size: int, task_sleep: float, ) -> TaskStats: - """Execute and time a single FuncX task. + """Execute and time a single Globus Compute task. Args: - fx (FuncXExecutor): FuncX Executor to submit task through. + gce (Executor): Globus Compute Executor to submit task through. input_size (int): number of bytes to send as input to task. output_size (int): number of bytes task should return. task_sleep (int): number of seconds to sleep inside task. @@ -80,7 +80,7 @@ def time_task( """ data = randbytes(input_size) start = time.perf_counter_ns() - fut = fx.submit( + fut = gce.submit( pong, data, result_size=output_size, @@ -103,17 +103,17 @@ def time_task( def time_task_ipfs( *, - fx: funcx.FuncXExecutor, + gce: globus_compute_sdk.Executor, ipfs_local_dir: str, ipfs_remote_dir: str, input_size: int, output_size: int, task_sleep: float, ) -> TaskStats: - """Execute and time a single FuncX task with IPFS for data transfer. + """Execute and time a single Globus Compute task with IPFS for transfer. Args: - fx (FuncXExecutor): FuncX Executor to submit task through. + gce (Executor): Globus Compute Executor to submit task through. ipfs_local_dir (str): Local IPFS directory to write files to. ipfs_remote_dir (str): Remote IPFS directory to write files to. input_size (int): number of bytes to send as input to task. @@ -130,7 +130,7 @@ def time_task_ipfs( filepath = os.path.join(ipfs_local_dir, str(uuid.uuid4())) cid = ipfs.add_data(data, filepath) - fut = fx.submit( + fut = gce.submit( pong_ipfs, cid, ipfs_remote_dir, @@ -230,16 +230,16 @@ def time_task_dspaces( def time_task_proxy( *, - fx: funcx.FuncXExecutor, + gce: globus_compute_sdk.Executor, store: Store, input_size: int, output_size: int, task_sleep: float, ) -> TaskStats: - """Execute and time a single FuncX task with proxied inputs. + """Execute and time a single Globus Compute task with proxied inputs. Args: - fx (FuncXExecutor): FuncX Executor to submit task through. + gce (Executor): Globus Compute Executor to submit task through. store (Store): ProxyStore Store to use for proxying input/outputs. input_size (int): number of bytes to send as input to task. output_size (int): number of bytes task should return. @@ -252,7 +252,7 @@ def time_task_proxy( start = time.perf_counter_ns() proxy: Proxy[bytes] = store.proxy(data, evict=True) - fut = fx.submit( + fut = gce.submit( pong_proxy, proxy, evict_result=False, @@ -292,7 +292,7 @@ def time_task_proxy( def runner( *, - funcx_endpoint: str, + globus_compute_endpoint: str, store: Store | None, use_dspaces: bool, use_ipfs: bool, @@ -305,12 +305,14 @@ def runner( csv_file: str | None, ) -> None: """Run all task configurations and log results.""" - store_class_name = None if store is None else store.__class__.__name__ + store_connector_name = ( + None if store is None else store.connector.__class__.__name__ + ) logger.log( TESTING_LOG_LEVEL, 'Starting test runner\n' - f' - FuncX Endpoint: {funcx_endpoint}\n' - f' - ProxyStore backend: {store_class_name}\n' + f' - Globus Compute Endpoint: {globus_compute_endpoint}\n' + f' - ProxyStore backend: {store_connector_name}\n' f' - DataSpaces enabled: {use_dspaces}\n' f' - IPFS enabled: {use_ipfs}\n' f' - Task type: ping-pong\n' @@ -326,9 +328,8 @@ def runner( ) runner_start = time.perf_counter_ns() - fx = funcx.FuncXExecutor( - endpoint_id=funcx_endpoint, - funcx_client=funcx.FuncXClient(), + gce = globus_compute_sdk.Executor( + endpoint_id=globus_compute_endpoint, batch_size=1, ) @@ -340,7 +341,7 @@ def runner( for _ in range(task_repeat): if store is not None: stats = time_task_proxy( - fx=fx, + gce=gce, store=store, input_size=input_size, output_size=output_size, @@ -350,7 +351,7 @@ def runner( assert ipfs_local_dir is not None assert ipfs_remote_dir is not None stats = time_task_ipfs( - fx=fx, + gce=gce, ipfs_local_dir=ipfs_local_dir, ipfs_remote_dir=ipfs_remote_dir, input_size=input_size, @@ -366,7 +367,7 @@ def runner( ) else: stats = time_task( - fx=fx, + gce=gce, input_size=input_size, output_size=output_size, task_sleep=task_sleep, @@ -380,8 +381,6 @@ def runner( if csv_file is not None: csv_logger.log(stats) - fx.shutdown() - if csv_file is not None: csv_logger.close() if use_ipfs: @@ -395,9 +394,11 @@ def _remote_cleanup() -> None: assert ipfs_remote_dir is not None shutil.rmtree(ipfs_remote_dir) - fut = fx.submit(_remote_cleanup) + fut = gce.submit(_remote_cleanup) fut.result() + gce.shutdown() + runner_end = time.perf_counter_ns() logger.log( TESTING_LOG_LEVEL, @@ -406,11 +407,11 @@ def _remote_cleanup() -> None: def main(argv: Sequence[str] | None = None) -> int: - """Simple FuncX Task Benchmark with ProxyStore.""" # noqa: D401 + """Simple Globus Compute Task Benchmark with ProxyStore.""" # noqa: D401 argv = argv if argv is not None else sys.argv[1:] parser = argparse.ArgumentParser( - description='Simple FuncX task benchmark with ProxyStore.', + description='Simple Globus Compute task benchmark with ProxyStore.', formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( @@ -439,7 +440,7 @@ def main(argv: Sequence[str] | None = None) -> int: required=True, help='Task output size in bytes', ) - add_funcx_options(parser, required=True) + add_globus_compute_options(parser, required=True) add_logging_options(parser) add_proxystore_options(parser, required=False) add_ipfs_options(parser) @@ -451,7 +452,7 @@ def main(argv: Sequence[str] | None = None) -> int: store = init_store_from_args(args, metrics=True) runner( - funcx_endpoint=args.funcx_endpoint, + globus_compute_endpoint=args.globus_compute_endpoint, store=store, use_dspaces=args.dspaces, use_ipfs=args.ipfs, diff --git a/psbench/proxystore.py b/psbench/proxystore.py index 88a8c34..744181a 100644 --- a/psbench/proxystore.py +++ b/psbench/proxystore.py @@ -57,9 +57,17 @@ def init_store_from_args( interface=args.ps_interface, ) elif args.ps_backend == 'UCX': - connector = UCXConnector(port=args.ps_port, interface=args.ps_interface, address=args.ps_address) + connector = UCXConnector( + port=args.ps_port, + interface=args.ps_interface, + address=args.ps_address, + ) elif args.ps_backend == 'ZMQ': - connector = ZeroMQConnector(port=args.ps_port, interface=args.ps_interface, address=args.ps_address) + connector = ZeroMQConnector( + port=args.ps_port, + interface=args.ps_interface, + address=args.ps_address, + ) else: raise ValueError(f'Invalid backend: {args.ps_backend}') diff --git a/pyproject.toml b/pyproject.toml index 26097d0..794fbab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,10 +20,10 @@ classifiers = [ ] dependencies = [ "colmena[funcx]==0.4.2", - "funcx==1.0.8", - "funcx-endpoint==1.0.8", + "globus-compute-endpoint==2.0.3", + "globus-compute-sdk==2.0.3", "proxystore[all]==0.5.0", - "redis==3.4.1", + "redis==4.5.5", "requests==2.28.2", "importlib-metadata; python_version<'3.8'", ] diff --git a/testing/funcx.py b/testing/globus_compute.py similarity index 57% rename from testing/funcx.py rename to testing/globus_compute.py index 47baf5c..3cb7c68 100644 --- a/testing/funcx.py +++ b/testing/globus_compute.py @@ -8,16 +8,16 @@ from typing import TypeVar from unittest import mock -import funcx +import globus_compute_sdk RT = TypeVar('RT') -class MockFuncXExecutor(funcx.FuncXExecutor): - """Mock FuncXExecutor.""" +class MockExecutor(globus_compute_sdk.Executor): + """Mock Executor.""" def __init__(self, *args: Any, **kwargs: Any) -> None: - """Init MockFuncXExecutor.""" + """Init MockExecutor.""" pass def submit( @@ -37,17 +37,13 @@ def shutdown(self, *args: Any, **kwargs: Any) -> None: @contextlib.contextmanager -def mock_funcx() -> Generator[None, None, None]: - """Context manager that mocks FuncXClient and FuncXExecutor.""" - with mock.patch('funcx.FuncXClient'): - with mock.patch( - 'funcx.FuncXExecutor', - MockFuncXExecutor, - ): - yield - - -def mock_executor() -> funcx.FuncXExecutor: - """Create a mock FuncXExectutor.""" - with mock_funcx(): - return funcx.FuncXExecutor(funcx.FuncXClient) +def mock_globus_compute() -> Generator[None, None, None]: + """Context manager that mocks Globus Compute Executor.""" + with mock.patch('globus_compute_sdk.Executor', MockExecutor): + yield + + +def mock_executor() -> globus_compute_sdk.Executor: + """Create a mock Exectutor.""" + with mock_globus_compute(): + return globus_compute_sdk.Executor() diff --git a/tests/argparse_test.py b/tests/argparse_test.py index b95192f..159dbdb 100644 --- a/tests/argparse_test.py +++ b/tests/argparse_test.py @@ -5,20 +5,20 @@ import pytest -from psbench.argparse import add_funcx_options +from psbench.argparse import add_globus_compute_options from psbench.argparse import add_logging_options from psbench.argparse import add_proxystore_options -def test_add_funcx_options(capsys) -> None: +def test_add_globus_compute_options(capsys) -> None: parser = argparse.ArgumentParser() - add_funcx_options(parser) + add_globus_compute_options(parser) parser.parse_args([]) - args = parser.parse_args(['--funcx-endpoint', 'ABCD']) - assert args.funcx_endpoint == 'ABCD' + args = parser.parse_args(['--globus-compute-endpoint', 'ABCD']) + assert args.globus_compute_endpoint == 'ABCD' parser = argparse.ArgumentParser() - add_funcx_options(parser, required=True) + add_globus_compute_options(parser, required=True) # Suppress argparse error message with mock.patch('argparse.ArgumentParser._print_message'): with pytest.raises(SystemExit): diff --git a/tests/benchmarks/funcx_tasks/__init__.py b/tests/benchmarks/globus_compute_tasks/__init__.py similarity index 100% rename from tests/benchmarks/funcx_tasks/__init__.py rename to tests/benchmarks/globus_compute_tasks/__init__.py diff --git a/tests/benchmarks/funcx_tasks/main_test.py b/tests/benchmarks/globus_compute_tasks/main_test.py similarity index 83% rename from tests/benchmarks/funcx_tasks/main_test.py rename to tests/benchmarks/globus_compute_tasks/main_test.py index 336d0c1..0282b52 100644 --- a/tests/benchmarks/funcx_tasks/main_test.py +++ b/tests/benchmarks/globus_compute_tasks/main_test.py @@ -12,21 +12,21 @@ from proxystore.store import Store from proxystore.store import unregister_store -from psbench.benchmarks.funcx_tasks.main import main -from psbench.benchmarks.funcx_tasks.main import runner -from psbench.benchmarks.funcx_tasks.main import time_task -from psbench.benchmarks.funcx_tasks.main import time_task_ipfs -from psbench.benchmarks.funcx_tasks.main import time_task_proxy -from testing.funcx import mock_executor -from testing.funcx import mock_funcx +from psbench.benchmarks.globus_compute_tasks.main import main +from psbench.benchmarks.globus_compute_tasks.main import runner +from psbench.benchmarks.globus_compute_tasks.main import time_task +from psbench.benchmarks.globus_compute_tasks.main import time_task_ipfs +from psbench.benchmarks.globus_compute_tasks.main import time_task_proxy +from testing.globus_compute import mock_executor +from testing.globus_compute import mock_globus_compute from testing.ipfs import mock_ipfs def test_time_task() -> None: - fx = mock_executor() + gce = mock_executor() stats = time_task( - fx=fx, + gce=gce, input_size=100, output_size=50, task_sleep=0.01, @@ -40,10 +40,10 @@ def test_time_task() -> None: def test_time_task_ipfs(tmp_path: pathlib.Path) -> None: with mock_ipfs(): - fx = mock_executor() + gce = mock_executor() stats = time_task_ipfs( - fx=fx, + gce=gce, ipfs_local_dir=str(tmp_path / 'local'), ipfs_remote_dir=str(tmp_path / 'remote'), input_size=100, @@ -57,7 +57,7 @@ def test_time_task_ipfs(tmp_path: pathlib.Path) -> None: assert stats.total_time_ms >= 10 stats = time_task_ipfs( - fx=fx, + gce=gce, ipfs_local_dir=str(tmp_path / 'local'), ipfs_remote_dir=str(tmp_path / 'remote'), input_size=100, @@ -72,12 +72,12 @@ def test_time_task_ipfs(tmp_path: pathlib.Path) -> None: def test_time_task_proxy() -> None: - fx = mock_executor() + gce = mock_executor() store = Store('test-time-task-store', LocalConnector(), metrics=True) register_store(store) stats = time_task_proxy( - fx=fx, + gce=gce, store=store, input_size=100, output_size=50, @@ -132,10 +132,10 @@ def test_runner( ipfs_local_dir.mkdir() ipfs_remote_dir.mkdir() - with mock_funcx(): + with mock_globus_compute(): with mock_ipfs(): runner( - funcx_endpoint=str(uuid.uuid4()), + globus_compute_endpoint=str(uuid.uuid4()), store=store, use_ipfs=use_ipfs, ipfs_local_dir=str(ipfs_local_dir), @@ -160,7 +160,7 @@ def test_runner_error() -> None: with Store('test-runner-store', LocalConnector()) as store: with pytest.raises(ValueError): runner( - funcx_endpoint=str(uuid.uuid4()), + globus_compute_endpoint=str(uuid.uuid4()), store=store, use_ipfs=True, ipfs_local_dir='/tmp/local/', @@ -173,11 +173,11 @@ def test_runner_error() -> None: ) -@mock.patch('psbench.benchmarks.funcx_tasks.main.runner') +@mock.patch('psbench.benchmarks.globus_compute_tasks.main.runner') def test_main(mock_runner) -> None: main( [ - '--funcx-endpoint', + '--globus-compute-endpoint', 'ABCD', '--input-sizes', '0', @@ -188,12 +188,12 @@ def test_main(mock_runner) -> None: ) with mock.patch( - 'psbench.benchmarks.funcx_tasks.main.init_store_from_args', + 'psbench.benchmarks.globus_compute_tasks.main.init_store_from_args', return_value=Store('test-main-store', LocalConnector()), ): main( [ - '--funcx-endpoint', + '--globus-compute-endpoint', 'ABCD', '--input-sizes', '0', From 0b5db75a28de0c659d16c7c8f5b4dfb84ee1c5af Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 14:16:51 -0500 Subject: [PATCH 14/31] Upgrade to Colmena v0.5.0 --- docs/colmena-rtt.md | 11 +++++---- psbench/argparse.py | 8 +++---- psbench/benchmarks/colmena_rtt/main.py | 29 +++++++++++++---------- pyproject.toml | 2 +- testing/colmena.py | 16 ++++++------- tests/benchmarks/colmena_rtt/main_test.py | 23 ++++++++++++------ tests/proxystore_test.py | 15 +++++++++--- tox.ini | 2 +- 8 files changed, 64 insertions(+), 42 deletions(-) diff --git a/docs/colmena-rtt.md b/docs/colmena-rtt.md index 97c9d69..3197f94 100644 --- a/docs/colmena-rtt.md +++ b/docs/colmena-rtt.md @@ -13,10 +13,11 @@ This benchmark measures roundtrip time with Colmena. ```bash $ pip install . ``` -3. Colmena can be used with Parsl or FuncX. With FuncX, an endpoint needs to be created. +3. Colmena can be used with Parsl or Globus Compute. + With Globus Compute, an endpoint needs to be created. ```bash - $ funcx-endpoint configure psbench - $ funcx-endpoint start psbench + $ globus-compute-endpoint configure psbench + $ globus-compute-endpoint start psbench ``` The returned endpoint UUID will be needed in the next step. @@ -25,10 +26,10 @@ This benchmark measures roundtrip time with Colmena. The benchmark can be configured using CLI parameters. The full list of options can be found using `--help`. -**FuncX** +**Globus Compute** ```bash $ python -m psbench.benchmarks.colmena_rtt \ - --funcx --endpoint b8aba48a-386d-4977-b5c9-9bcbbaebd0bf \ + --globus-compute --endpoint b8aba48a-386d-4977-b5c9-9bcbbaebd0bf \ --input-sizes 100 1000 10000 \ --output-sizes 100 1000 10000 \ --task-repeat 5 diff --git a/psbench/argparse.py b/psbench/argparse.py index ed15b71..7af0a27 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -18,14 +18,14 @@ def add_globus_compute_options( (default: False). """ group = parser.add_argument_group( - title='FuncX', - description='FuncX Endpoint configuration', + title='Globus Compute', + description='Globus Compute Endpoint configuration', ) group.add_argument( '--globus-compute-endpoint', metavar='UUID', required=required, - help='FuncX endpoint for task execution', + help='Globus Compute endpoint for task execution', ) @@ -183,7 +183,7 @@ def add_proxystore_options( '--ps-address', metavar='ADDRESS', default=None, - help='Optional specify host IP address that can be used by the DIMs', + help='Optionally specify host IP address that can be used by the DIMs', ) group.add_argument( '--ps-interface', diff --git a/psbench/benchmarks/colmena_rtt/main.py b/psbench/benchmarks/colmena_rtt/main.py index 44e7834..4b7e637 100644 --- a/psbench/benchmarks/colmena_rtt/main.py +++ b/psbench/benchmarks/colmena_rtt/main.py @@ -21,13 +21,13 @@ from typing import NamedTuple from typing import Sequence -import funcx +import globus_compute_sdk from colmena.models import Result from colmena.queue.base import ColmenaQueues from colmena.queue.python import PipeQueues from colmena.queue.redis import RedisQueues from colmena.task_server.base import BaseTaskServer -from colmena.task_server.funcx import FuncXTaskServer +from colmena.task_server.globus import GlobusComputeTaskServer from colmena.task_server.parsl import ParslTaskServer from colmena.thinker import agent from colmena.thinker import BaseThinker @@ -139,7 +139,7 @@ def consumer(self) -> None: result, result.task_info['input_size'], result.task_info['output_size'], - self.store.__class__.__name__ + self.store.connector.__class__.__name__ if self.store is not None else '', ) @@ -226,9 +226,9 @@ def main(argv: Sequence[str] | None = None) -> int: ) backend_group = parser.add_mutually_exclusive_group(required=True) backend_group.add_argument( - '--funcx', + '--globus-compute', action='store_true', - help='Use the FuncX Colmena Task Server', + help='Use the Globus Compute Colmena Task Server', ) backend_group.add_argument( '--parsl', @@ -236,11 +236,11 @@ def main(argv: Sequence[str] | None = None) -> int: help='Use the Parsl Colmena Task Server', ) - funcx_group = parser.add_argument_group() - funcx_group.add_argument( + globus_compute_group = parser.add_argument_group() + globus_compute_group.add_argument( '--endpoint', - required='--funcx' in sys.argv, - help='FuncX endpoint for task execution', + required='--globus-compute' in sys.argv, + help='Globus Compute endpoint for task execution', ) task_group = parser.add_argument_group() @@ -329,15 +329,18 @@ def main(argv: Sequence[str] | None = None) -> int: ) doer: BaseTaskServer - if args.funcx: - fcx = funcx.FuncXClient() - doer = FuncXTaskServer({target_function: args.endpoint}, fcx, queues) + if args.globus_compute: + doer = GlobusComputeTaskServer( + {target_function: args.endpoint}, + globus_compute_sdk.Client(), + queues, + ) elif args.parsl: config = get_config(output_dir) doer = ParslTaskServer([target_function], queues, config) else: raise AssertionError( - '--funcx and --parsl are part of a required mutex group.', + '--globus-compute and --parsl are part of a required mutex group.', ) thinker = Thinker( diff --git a/pyproject.toml b/pyproject.toml index 794fbab..f0e8dda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", ] dependencies = [ - "colmena[funcx]==0.4.2", + "colmena[globus]==0.5.0", "globus-compute-endpoint==2.0.3", "globus-compute-sdk==2.0.3", "proxystore[all]==0.5.0", diff --git a/testing/colmena.py b/testing/colmena.py index 02cd95c..8b01c0b 100644 --- a/testing/colmena.py +++ b/testing/colmena.py @@ -4,29 +4,29 @@ from typing import Any from typing import Callable -import funcx +import globus_compute_sdk from colmena.models import Result from colmena.queue.base import ColmenaQueues from colmena.task_server.base import FutureBasedTaskServer from colmena.task_server.base import run_and_record_timing -from testing.funcx import MockFuncXExecutor +from testing.globus_compute import MockExecutor -class MockFuncXTaskServer(FutureBasedTaskServer): - """Mock FuncXTaskServer.""" +class MockGlobusComputeTaskServer(FutureBasedTaskServer): + """Mock GlobusComputeTaskServer.""" def __init__( self, methods: dict[Callable[[Any], Any], str], - funcx_client: funcx.FuncXClient, + client: globus_compute_sdk.Client, queues: ColmenaQueues, timeout: int | None = None, batch_size: int = 128, ) -> None: - """Init MockFuncXTaskServer.""" - super().__init__(queues, timeout) + """Init MockGlobusComputeTaskServer.""" self._methods = {f.__name__: f for f in methods} + super().__init__(queues, self._methods.keys(), timeout) def _submit(self, task: Result, topic: str) -> Future[Any]: func = self._methods[task.method] @@ -38,7 +38,7 @@ def _submit(self, task: Result, topic: str) -> Future[Any]: return fut def _setup(self) -> None: - self.fx_exec = MockFuncXExecutor() + self.fx_exec = MockExecutor() def _cleanup(self) -> None: pass diff --git a/tests/benchmarks/colmena_rtt/main_test.py b/tests/benchmarks/colmena_rtt/main_test.py index 112f9cd..12cf17c 100644 --- a/tests/benchmarks/colmena_rtt/main_test.py +++ b/tests/benchmarks/colmena_rtt/main_test.py @@ -8,8 +8,8 @@ import redis from psbench.benchmarks.colmena_rtt.main import main -from testing.colmena import MockFuncXTaskServer -from testing.funcx import mock_funcx +from testing.colmena import MockGlobusComputeTaskServer +from testing.globus_compute import mock_globus_compute REDIS_HOST = 'localhost' REDIS_PORT = 6379 @@ -65,18 +65,27 @@ def test_parsl_e2e( assert main(args) == 0 -def test_funcx_e2e(tmp_path: pathlib.Path, default_args: list[str]) -> None: +def test_globus_compute_e2e( + tmp_path: pathlib.Path, + default_args: list[str], +) -> None: run_dir = tmp_path / 'runs' run_dir.mkdir() - args = ['--funcx', '--endpoint', 'ENDPOINT', '--output-dir', str(run_dir)] + args = [ + '--globus-compute', + '--endpoint', + 'ENDPOINT', + '--output-dir', + str(run_dir), + ] args += default_args with mock.patch( - 'psbench.benchmarks.colmena_rtt.main.FuncXTaskServer', - MockFuncXTaskServer, + 'psbench.benchmarks.colmena_rtt.main.GlobusComputeTaskServer', + MockGlobusComputeTaskServer, ): - with mock_funcx(): + with mock_globus_compute(): assert main(args) == 0 diff --git a/tests/proxystore_test.py b/tests/proxystore_test.py index 6074ff9..b3d5fd3 100644 --- a/tests/proxystore_test.py +++ b/tests/proxystore_test.py @@ -32,13 +32,22 @@ 'MARGO', MargoConnector, { - 'ps_host': 'localhost', 'ps_port': 1234, + 'ps_address': None, + 'ps_interface': 'lo', 'ps_margo_protocol': 'tcp', }, ), - ('UCX', UCXConnector, {'ps_host': 'localhost', 'ps_port': 1234}), - ('ZMQ', ZeroMQConnector, {'ps_host': 'localhost', 'ps_port': 1234}), + ( + 'UCX', + UCXConnector, + {'ps_port': 1234, 'ps_address': None, 'ps_interface': 'lo'}, + ), + ( + 'ZMQ', + ZeroMQConnector, + {'ps_port': 1234, 'ps_address': None, 'ps_interface': 'lo'}, + ), (None, None, {}), ('INVALID_BACKEND', None, {}), ), diff --git a/tox.ini b/tox.ini index 34d10ca..5e06b4f 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ envlist = py37, py38, py39, py310, py311, pre-commit [testenv] extras = dev commands = - # dill>=0.3.6 required for Python 3.11 but FuncX 1.0.8 pins it to 0.3.5.1 + # dill>=0.3.6 required for Python 3.11 but Globus Compute 2.0 pins it to 0.3.5.1 pip install -U dill==0.3.6 coverage erase coverage run -m pytest {posargs} From 10ce293711e6e11ab03148bc61a950df3c649ae3 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 14:17:32 -0500 Subject: [PATCH 15/31] Bump to psbench v0.1.0 --- pyproject.toml | 2 +- testing/globus_compute.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f0e8dda..f4c3528 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "psbench" -version = "0.0.1" +version = "0.1.0" authors = [ {name = "Greg Pauloski", email = "jgpauloski@uchicago.edu"}, ] diff --git a/testing/globus_compute.py b/testing/globus_compute.py index 3cb7c68..9c3d370 100644 --- a/testing/globus_compute.py +++ b/testing/globus_compute.py @@ -39,7 +39,9 @@ def shutdown(self, *args: Any, **kwargs: Any) -> None: @contextlib.contextmanager def mock_globus_compute() -> Generator[None, None, None]: """Context manager that mocks Globus Compute Executor.""" - with mock.patch('globus_compute_sdk.Executor', MockExecutor): + with mock.patch( + 'globus_compute_sdk.Client', + ), mock.patch('globus_compute_sdk.Executor', MockExecutor): yield From 463f5a4aadb3e06751564b3fb1bf0d0b7caaf9eb Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 15:56:24 -0500 Subject: [PATCH 16/31] Update PR tagging for generating releases --- .github/dependabot.yml | 2 ++ .github/pull_request_template.md | 33 +++++++++++++++++++----------- .github/release.yml | 35 ++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 .github/release.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b608ffc..9ff1c8c 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -6,3 +6,5 @@ updates: schedule: # Check for updates to GitHub Actions every week interval: "weekly" + labels: + - "development" diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index d23fa73..ee896aa 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,23 +1,31 @@ + + # Description ### Fixes - + -- Fixes #XX - Fixes #XX ### Type of Change - - -- [ ] Bug fix (non-breaking change which fixes an issue) -- [ ] New feature (non-breaking change which adds functionality) -- [ ] Refactoring (internal implementation changes) -- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) -- [ ] Documentation update (no changes to the code) -- [ ] CI change (changes to CI workflows, packages, templates, etc.) -- [ ] Version changes (changes to the package or dependency versions) + + +- [ ] Breaking Change (fix or enhancement which changes existing semantics of the public interface) +- [ ] Enhancement (new features or improvements to existing functionality) +- [ ] Bug (fixes for a bug or issue) +- [ ] Internal (refactoring, style changes, testing, optimizations) +- [ ] Documentation update (changes to documentation or examples) +- [ ] Package (dependencies, versions, package metadata) +- [ ] Development (CI workflows, pre-commit, linters, templates) +- [ ] Security (security related changes) ## Testing @@ -26,7 +34,8 @@ ## Pull Request Checklist Please confirm the PR meets the following requirements. -- [ ] Code changes pass `pre-commit` (e.g., black, flake8, mypy, etc.). +- [ ] Tags added to PR (e.g., breaking, bug, enhancement, internal, documentation, package, development, security). +- [ ] Code changes pass `pre-commit` (e.g., black, mypy, ruff, etc.). - [ ] Tests have been added to show the fix is effective or that the new feature works. - [ ] New and existing unit tests pass locally with the changes. - [ ] Docs have been updated and reviewed if relevant. diff --git a/.github/release.yml b/.github/release.yml new file mode 100644 index 0000000..1bf3aaf --- /dev/null +++ b/.github/release.yml @@ -0,0 +1,35 @@ +changelog: + exclude: + labels: + - ignore-for-release + authors: + - pre-commit-ci + categories: + - title: Breaking Changes + labels: + - breaking + - title: Enhancements + labels: + - enhancement + - title: Bug Fixes + labels: + - bug + - title: Internal Changes + labels: + - internal + - title: Documentation + labels: + - documentation + - title: Package Changes + labels: + - package + - title: Development Workflows + labels: + - development + - title: Security + labels: + - security + # All PRs not tagged as the above + - title: Other Changes + labels: + - "*" From d16a19a7255371e6d7cf6c7ff1735cb456f85520 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 15:57:11 -0500 Subject: [PATCH 17/31] Add cache cleanup action --- .github/workflows/cache.yml | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/workflows/cache.yml diff --git a/.github/workflows/cache.yml b/.github/workflows/cache.yml new file mode 100644 index 0000000..e18cc8d --- /dev/null +++ b/.github/workflows/cache.yml @@ -0,0 +1,34 @@ +name: cache-cleanup + +on: + pull_request: + types: + - closed + +jobs: + cleanup: + runs-on: ubuntu-latest + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Cleanup + run: | + gh extension install actions/gh-actions-cache + + REPO=${{ github.repository }} + BRANCH="refs/pull/${{ github.event.pull_request.number }}/merge" + + echo "Fetching list of cache key" + cacheKeysForPR=$(gh actions-cache list -R $REPO -B $BRANCH | cut -f 1 ) + + ## Setting this to not fail the workflow while deleting cache keys. + set +e + echo "Deleting caches..." + for cacheKey in $cacheKeysForPR + do + gh actions-cache delete $cacheKey -R $REPO -B $BRANCH --confirm + done + echo "Done" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 6f7abfdda01235fac275664e114037123aab6ea1 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 16:03:04 -0500 Subject: [PATCH 18/31] Update linting configurations --- .pre-commit-config.yaml | 2 +- .../benchmarks/globus_compute_tasks/main.py | 2 +- psbench/tasks/pong.py | 3 +- pyproject.toml | 57 +++++++++++++++++-- 4 files changed, 57 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a32b041..cf7b291 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,6 @@ repos: - id: check-yaml - id: check-merge-conflict - id: name-tests-test - - id: double-quote-string-fixer - repo: 'https://github.com/psf/black' rev: 23.3.0 hooks: @@ -28,6 +27,7 @@ repos: - id: ruff args: - '--fix' + - '--exit-non-zero-on-fix' - repo: 'https://github.com/pre-commit/mirrors-mypy' rev: v1.2.0 hooks: diff --git a/psbench/benchmarks/globus_compute_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py index 7a928ea..4b4ba40 100644 --- a/psbench/benchmarks/globus_compute_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -407,7 +407,7 @@ def _remote_cleanup() -> None: def main(argv: Sequence[str] | None = None) -> int: - """Simple Globus Compute Task Benchmark with ProxyStore.""" # noqa: D401 + """Simple Globus Compute Task Benchmark with ProxyStore.""" argv = argv if argv is not None else sys.argv[1:] parser = argparse.ArgumentParser( diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index ca51e0a..4616424 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -171,7 +171,8 @@ def pong_proxy( resolve_async(data) time.sleep(sleep) - assert isinstance(data, bytes) and isinstance(data, Proxy) + assert isinstance(data, bytes) + assert isinstance(data, Proxy) result_data = randbytes(result_size) store = get_store(data) diff --git a/pyproject.toml b/pyproject.toml index f4c3528..c35a8d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,12 +92,61 @@ allow_incomplete_defs = true allow_untyped_defs = true [tool.ruff] -# pycodestyle, pyflakes, flake8-builtins, flake8-bugbear, isort, pep8-naming, -# pydocstyle, flake8-debugger, flake8-commas -select = ["E", "F", "A", "B", "I", "N", "D", "T10", "COM"] -extend-ignore = ["D10"] +# See all rules here: https://beta.ruff.rs/docs/rules +select = [ + # pyflakes + "F", + # pycodestyle + "E", + # isort + "I", + # pep8-naming + "N", + # pydocstyle + "D", + # pyupgrade + "UP", + # flake8-bugbear + "B", + # flake8-builtins + "A", + # flake8-commas + "COM", + # flake8-comprehensions + "C4", + # flake8-implicit-str-concat + "ISC", + # flake8-pytest-style + "PT", + # flake8-quotes + "Q", + # flake8-debugger + "T10", + # PyLint + "PLE", "PLW", + # ruff-specific + "RUF", +] +extend-ignore = [ + "D10", + # Allow builtin attribute shadowing + "A003", + # Ruff will change all([generator]) to all(generator) because the all/any + # generator expressions directly and the list comprehension is not needed. + # However, coverage marks unfinished generators as not covered and + # all/any can early exit before exhausting the generator. + "C419", + # Allow pytest.raises() without match + "PT011", +] target-version = "py37" +[tool.ruff.flake8-pytest-style] +parametrize-values-type = "tuple" + +[tool.ruff.flake8-quotes] +inline-quotes = "single" + [tool.ruff.isort] force-single-line = true known-first-party = ["psbench", "test", "testing"] From 553aa934b1b271002c87e854309c234352718ded Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 16:03:59 -0500 Subject: [PATCH 19/31] Update package metadata --- pyproject.toml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c35a8d8..eb06032 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,10 @@ build-backend = "setuptools.build_meta" name = "psbench" version = "0.1.0" authors = [ + {name = "Globus Labs"}, + {name = "Greg Pauloski", email = "jgpauloski@uchicago.edu"}, +] +maintainers = [ {name = "Greg Pauloski", email = "jgpauloski@uchicago.edu"}, ] description = "ProxyStore benchmark suite." @@ -29,8 +33,9 @@ dependencies = [ ] [project.urls] -repository = "https://github.com/proxystore/proxystore-benchmarks" +homepage = "https://proxystore.dev" documentation = "https://proxystore.readthedocs.io" +repository = "https://github.com/proxystore/proxystore-benchmarks" [project.optional-dependencies] dev = [ From 1f09a5a08144d0230ffde4d2b757b964a52de260 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Mon, 15 May 2023 16:18:01 -0500 Subject: [PATCH 20/31] Autofix ruff linting errors --- testing/mocking.py | 2 +- tests/benchmarks/colmena_rtt/main_test.py | 11 ++++---- tests/benchmarks/endpoint_qps/routes_test.py | 2 +- .../globus_compute_tasks/main_test.py | 26 ++++++++++++------- .../remote_ops/endpoint_ops_test.py | 10 +++---- tests/benchmarks/remote_ops/main_test.py | 4 +-- tests/benchmarks/remote_ops/redis_ops_test.py | 4 +-- tests/ipfs_test.py | 2 +- tests/proxystore_test.py | 2 +- tests/tasks/pong_test.py | 15 +++++++---- 10 files changed, 45 insertions(+), 33 deletions(-) diff --git a/testing/mocking.py b/testing/mocking.py index 776f063..37556eb 100644 --- a/testing/mocking.py +++ b/testing/mocking.py @@ -25,7 +25,7 @@ def get(self, key: str) -> Any: return self.data[key] return None # pragma: no cover - def set( # noqa: A003 + def set( self, key: str, value: str | bytes | int | float, diff --git a/tests/benchmarks/colmena_rtt/main_test.py b/tests/benchmarks/colmena_rtt/main_test.py index 12cf17c..de5ab7f 100644 --- a/tests/benchmarks/colmena_rtt/main_test.py +++ b/tests/benchmarks/colmena_rtt/main_test.py @@ -1,7 +1,6 @@ from __future__ import annotations import pathlib -from typing import Generator from unittest import mock import pytest @@ -25,9 +24,9 @@ redis_available = False -@pytest.fixture -def default_args(tmp_path) -> Generator[list[str], None, None]: - yield [ +@pytest.fixture() +def default_args(tmp_path) -> list[str]: + return [ '--input-sizes', '100', '1000', @@ -40,7 +39,7 @@ def default_args(tmp_path) -> Generator[list[str], None, None]: @pytest.mark.parametrize( - 'use_proxystore,use_csv,args', + ('use_proxystore', 'use_csv', 'args'), ( (True, True, []), (False, False, ['--reuse-inputs']), @@ -56,7 +55,7 @@ def test_parsl_e2e( run_dir = tmp_path / 'runs' run_dir.mkdir() - args = ['--parsl', '--output-dir', str(run_dir)] + args + default_args + args = ['--parsl', '--output-dir', str(run_dir), *args, *default_args] if use_proxystore: args += ['--ps-backend', 'FILE', '--ps-file-dir', str(run_dir / 'ps')] if use_csv: diff --git a/tests/benchmarks/endpoint_qps/routes_test.py b/tests/benchmarks/endpoint_qps/routes_test.py index 281a294..96192dc 100644 --- a/tests/benchmarks/endpoint_qps/routes_test.py +++ b/tests/benchmarks/endpoint_qps/routes_test.py @@ -17,7 +17,7 @@ QUERIES = 10 -@pytest.fixture +@pytest.fixture() def endpoint_store() -> Generator[EndpointStore, None, None]: with mock.patch('proxystore.store.endpoint.EndpointStore'): from proxystore.store import endpoint diff --git a/tests/benchmarks/globus_compute_tasks/main_test.py b/tests/benchmarks/globus_compute_tasks/main_test.py index 0282b52..49623ef 100644 --- a/tests/benchmarks/globus_compute_tasks/main_test.py +++ b/tests/benchmarks/globus_compute_tasks/main_test.py @@ -89,19 +89,27 @@ def test_time_task_proxy() -> None: assert stats.output_size_bytes == 50 assert stats.task_sleep_seconds == 0.01 assert stats.total_time_ms >= 10 - assert stats.input_get_ms is not None and stats.input_get_ms > 0 - assert stats.input_put_ms is not None and stats.input_put_ms > 0 - assert stats.input_proxy_ms is not None and stats.input_proxy_ms > 0 - assert stats.input_resolve_ms is not None and stats.input_resolve_ms > 0 - assert stats.output_get_ms is not None and stats.output_get_ms > 0 - assert stats.output_put_ms is not None and stats.output_put_ms > 0 - assert stats.output_proxy_ms is not None and stats.output_proxy_ms > 0 - assert stats.output_resolve_ms is not None and stats.output_resolve_ms > 0 + assert stats.input_get_ms is not None + assert stats.input_get_ms > 0 + assert stats.input_put_ms is not None + assert stats.input_put_ms > 0 + assert stats.input_proxy_ms is not None + assert stats.input_proxy_ms > 0 + assert stats.input_resolve_ms is not None + assert stats.input_resolve_ms > 0 + assert stats.output_get_ms is not None + assert stats.output_get_ms > 0 + assert stats.output_put_ms is not None + assert stats.output_put_ms > 0 + assert stats.output_proxy_ms is not None + assert stats.output_proxy_ms > 0 + assert stats.output_resolve_ms is not None + assert stats.output_resolve_ms > 0 unregister_store(store) @pytest.mark.parametrize( - 'use_ipfs,use_proxystore,log_to_csv', + ('use_ipfs', 'use_proxystore', 'log_to_csv'), ((False, True, False), (True, False, False), (False, False, True)), ) def test_runner( diff --git a/tests/benchmarks/remote_ops/endpoint_ops_test.py b/tests/benchmarks/remote_ops/endpoint_ops_test.py index bb88ee4..483fb65 100644 --- a/tests/benchmarks/remote_ops/endpoint_ops_test.py +++ b/tests/benchmarks/remote_ops/endpoint_ops_test.py @@ -11,31 +11,31 @@ @pytest_asyncio.fixture -@pytest.mark.asyncio +@pytest.mark.asyncio() async def endpoint() -> AsyncGenerator[Endpoint, None]: async with Endpoint('test-ep', uuid.uuid4()) as ep: yield ep -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_evict(endpoint: Endpoint) -> None: times = await ops.test_evict(endpoint, None, 2) assert len(times) == 2 -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_exists(endpoint: Endpoint) -> None: times = await ops.test_exists(endpoint, None, 2) assert len(times) == 2 -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_get(endpoint: Endpoint) -> None: times = await ops.test_get(endpoint, None, 100, 2) assert len(times) == 2 -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_set(endpoint: Endpoint) -> None: times = await ops.test_set(endpoint, None, 100, 2) assert len(times) == 2 diff --git a/tests/benchmarks/remote_ops/main_test.py b/tests/benchmarks/remote_ops/main_test.py index 986e40c..12a8891 100644 --- a/tests/benchmarks/remote_ops/main_test.py +++ b/tests/benchmarks/remote_ops/main_test.py @@ -18,7 +18,7 @@ from testing.mocking import MockStrictRedis -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_runner_endpoint() -> None: await runner_endpoint( None, @@ -40,7 +40,7 @@ def test_runner_redis() -> None: ) -@pytest.mark.asyncio +@pytest.mark.asyncio() async def test_csv_logging_endpoint() -> None: with tempfile.NamedTemporaryFile() as f: assert len(f.readlines()) == 0 diff --git a/tests/benchmarks/remote_ops/redis_ops_test.py b/tests/benchmarks/remote_ops/redis_ops_test.py index 4dba4e4..19e62f7 100644 --- a/tests/benchmarks/remote_ops/redis_ops_test.py +++ b/tests/benchmarks/remote_ops/redis_ops_test.py @@ -10,9 +10,9 @@ from testing.mocking import MockStrictRedis -@pytest.fixture +@pytest.fixture() def client() -> Generator[redis.StrictRedis[Any], None, None]: - yield MockStrictRedis() # type: ignore + return MockStrictRedis() # type: ignore def test_evict(client: redis.StrictRedis[Any]) -> None: diff --git a/tests/ipfs_test.py b/tests/ipfs_test.py index 29454ca..bf8d107 100644 --- a/tests/ipfs_test.py +++ b/tests/ipfs_test.py @@ -11,7 +11,7 @@ def test_add_data(tmp_path: pathlib.Path) -> None: cid = str(uuid.uuid4()) filepath = str(tmp_path / str(uuid.uuid4())) - output = f'add {cid} {filepath}'.encode('utf-8') + output = f'add {cid} {filepath}'.encode() with mock.patch('subprocess.check_output', return_value=output): found_cid = add_data(b'data', filepath) diff --git a/tests/proxystore_test.py b/tests/proxystore_test.py index b3d5fd3..43afdc4 100644 --- a/tests/proxystore_test.py +++ b/tests/proxystore_test.py @@ -18,7 +18,7 @@ @pytest.mark.parametrize( - 'backend,backend_type,kwargs', + ('backend', 'backend_type', 'kwargs'), ( ('ENDPOINT', EndpointConnector, {'ps_endpoints': ['abcd']}), ('FILE', FileConnector, {'ps_file_dir': '/tmp/file'}), diff --git a/tests/tasks/pong_test.py b/tests/tasks/pong_test.py index b5337bf..4707af5 100644 --- a/tests/tasks/pong_test.py +++ b/tests/tasks/pong_test.py @@ -52,7 +52,8 @@ def test_pong_proxy() -> None: end = time.perf_counter_ns() assert stats is None - assert isinstance(result_data, Proxy) and isinstance(result_data, bytes) + assert isinstance(result_data, Proxy) + assert isinstance(result_data, bytes) assert len(result_data) == 10 assert (end - start) / 1e9 >= 0.01 unregister_store(store) @@ -64,10 +65,14 @@ def test_pong_proxy_stats() -> None: input_data: Proxy[bytes] = store.proxy(b'abcd') _, stats = pong_proxy(input_data, result_size=10) assert stats is not None - assert stats.input_get_ms is not None and stats.input_get_ms > 0 - assert stats.input_resolve_ms is not None and stats.input_resolve_ms > 0 - assert stats.output_put_ms is not None and stats.output_put_ms > 0 - assert stats.output_proxy_ms is not None and stats.output_proxy_ms > 0 + assert stats.input_get_ms is not None + assert stats.input_get_ms > 0 + assert stats.input_resolve_ms is not None + assert stats.input_resolve_ms > 0 + assert stats.output_put_ms is not None + assert stats.output_put_ms > 0 + assert stats.output_proxy_ms is not None + assert stats.output_proxy_ms > 0 unregister_store(store) From 26206975c664bc21d11f90b4e5441040afb1a7dc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 16 May 2023 00:13:13 +0000 Subject: [PATCH 21/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.265 → v0.0.267](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.265...v0.0.267) - [github.com/pre-commit/mirrors-mypy: v1.2.0 → v1.3.0](https://github.com/pre-commit/mirrors-mypy/compare/v1.2.0...v1.3.0) --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cf7b291..13cccab 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,14 +22,14 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.265 + rev: v0.0.267 hooks: - id: ruff args: - '--fix' - '--exit-non-zero-on-fix' - repo: 'https://github.com/pre-commit/mirrors-mypy' - rev: v1.2.0 + rev: v1.3.0 hooks: - id: mypy additional_dependencies: From 8fc05860e2af2c0b58b075e6c1f35898e59cbd4f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 23 May 2023 00:03:26 +0000 Subject: [PATCH 22/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.267 → v0.0.269](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.267...v0.0.269) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 13cccab..4396ad5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.267 + rev: v0.0.269 hooks: - id: ruff args: From 30d9d9c6466770f349bbb0551a5badb1c02db96e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 29 May 2023 23:29:21 +0000 Subject: [PATCH 23/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.269 → v0.0.270](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.269...v0.0.270) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4396ad5..25dc1c3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.269 + rev: v0.0.270 hooks: - id: ruff args: From 569fc3bac5dfc5a4292dbf702f5d1deec64f21fc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 13 Jun 2023 00:40:38 +0000 Subject: [PATCH 24/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/charliermarsh/ruff-pre-commit: v0.0.270 → v0.0.272](https://github.com/charliermarsh/ruff-pre-commit/compare/v0.0.270...v0.0.272) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 25dc1c3..0cf9898 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,7 @@ repos: hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.0.270 + rev: v0.0.272 hooks: - id: ruff args: From 17c917a070ac70092cbc27841cd5c7e4677c6990 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 01:04:09 +0000 Subject: [PATCH 25/31] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/codespell-project/codespell: v2.2.4 → v2.2.5](https://github.com/codespell-project/codespell/compare/v2.2.4...v2.2.5) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0cf9898..74f4168 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: hooks: - id: black - repo: 'https://github.com/codespell-project/codespell' - rev: v2.2.4 + rev: v2.2.5 hooks: - id: codespell - repo: 'https://github.com/charliermarsh/ruff-pre-commit' From 5331d29426281cf202698e6b75cd8cc340608b28 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Sat, 24 Jun 2023 11:01:11 -0700 Subject: [PATCH 26/31] Bump ProxyStore dependency to v0.5.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index eb06032..59333a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "colmena[globus]==0.5.0", "globus-compute-endpoint==2.0.3", "globus-compute-sdk==2.0.3", - "proxystore[all]==0.5.0", + "proxystore[all]==0.5.1", "redis==4.5.5", "requests==2.28.2", "importlib-metadata; python_version<'3.8'", From d4893acb17fe94fe3aa39a1484d6419f7bbfd14a Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Sat, 24 Jun 2023 11:05:04 -0700 Subject: [PATCH 27/31] Bump to psbench v0.1.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 59333a4..62d3d55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "psbench" -version = "0.1.0" +version = "0.1.1" authors = [ {name = "Globus Labs"}, {name = "Greg Pauloski", email = "jgpauloski@uchicago.edu"}, From 22ede228648eb0bad8a0db154ef7b9f3b2545f3e Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Sat, 24 Jun 2023 11:55:31 -0700 Subject: [PATCH 28/31] Add zenodo badge --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cc903d2..0411da5 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ # ProxyStore Benchmark Suite +[![DOI](https://zenodo.org/badge/517741889.svg)](https://zenodo.org/badge/latestdoi/517741889) +[![tests](https://github.com/proxystore/proxystore-benchmarks/actions/workflows/tests.yml/badge.svg)](https://github.com/proxystore/proxystore-benchmarks/actions) [![pre-commit.ci status](https://results.pre-commit.ci/badge/github/proxystore/proxystore-benchmarks/main.svg)](https://results.pre-commit.ci/latest/github/proxystore/proxystore-benchmarks/main) -[![Tests](https://github.com/proxystore/proxystore-benchmarks/actions/workflows/tests.yml/badge.svg)](https://github.com/proxystore/proxystore-benchmarks/actions) [ProxyStore](https://github.com/proxystore/proxystore) benchmark repository. Check out the [benchmark instructions](docs/) to get started. From 164a18749608f60d4a5a9deb5aec48b39427efff Mon Sep 17 00:00:00 2001 From: valhayot Date: Tue, 21 Mar 2023 21:34:59 +0000 Subject: [PATCH 29/31] Add DataSpaces to funcX pong benchmarks --- psbench/benchmarks/globus_compute_tasks/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/psbench/benchmarks/globus_compute_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py index 4b4ba40..7d6403c 100644 --- a/psbench/benchmarks/globus_compute_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -24,6 +24,7 @@ from psbench import ipfs from psbench.argparse import add_dspaces_options from psbench.argparse import add_globus_compute_options +from psbench.argparse import add_dspaces_options from psbench.argparse import add_ipfs_options from psbench.argparse import add_logging_options from psbench.argparse import add_proxystore_options From 52fcdb33e5aaed5bda82d3e1c9fcac9e4a4bf13f Mon Sep 17 00:00:00 2001 From: valhayot Date: Sun, 25 Jun 2023 15:10:57 -0500 Subject: [PATCH 30/31] Convert FuncX to Globus Compute for DataSpaces --- psbench/benchmarks/globus_compute_tasks/main.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/psbench/benchmarks/globus_compute_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py index 7d6403c..c713e80 100644 --- a/psbench/benchmarks/globus_compute_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -158,15 +158,15 @@ def time_task_ipfs( def time_task_dspaces( *, - fx: funcx.FuncXExecutor, + gce: globus_compute_sdk.Executor, input_size: int, output_size: int, task_sleep: float, ) -> TaskStats: - """Execute and time a single FuncX task with DataSpaces for data transfer. + """Execute and time a single Globus Compute task with DataSpaces for data transfer. Args: - fx (FuncXExecutor): FuncX Executor to submit task through. + gce (Executor): Globus Compute Executor to submit task through. input_size (int): number of bytes to send as input to task. output_size (int): number of bytes task should return. task_sleep (int): number of seconds to sleep inside task. @@ -191,7 +191,7 @@ def time_task_dspaces( start = time.perf_counter_ns() client.Put(np.array(bytearray(data)), path, version=version, offset=((input_size * rank),)) - fut = fx.submit( + fut = gce.submit( pong_dspaces, path, input_size, @@ -361,7 +361,7 @@ def runner( ) elif use_dspaces: stats = time_task_dspaces( - fx=fx, + gce=gce, input_size=input_size, output_size=output_size, task_sleep=task_sleep, From 5828ea4b24909d20cc8842c5b32e756cbe057ca6 Mon Sep 17 00:00:00 2001 From: valhayot Date: Sat, 19 Aug 2023 17:32:48 +0000 Subject: [PATCH 31/31] Fix pre-commit issues --- dataspaces.conf | 1 - psbench/argparse.py | 2 +- .../benchmarks/globus_compute_tasks/main.py | 17 +++++++++++------ psbench/tasks/pong.py | 19 ++++++++++--------- pyproject.toml | 6 ------ .../globus_compute_tasks/main_test.py | 11 +++++++++-- 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/dataspaces.conf b/dataspaces.conf index bec9f21..eeda567 100644 --- a/dataspaces.conf +++ b/dataspaces.conf @@ -3,4 +3,3 @@ ndim = 1 dims = 10000000000 max_versions = 1 num_apps = 1 - diff --git a/psbench/argparse.py b/psbench/argparse.py index 7af0a27..9bfa25d 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -60,7 +60,7 @@ def add_dspaces_options(parser: argparse.ArgumentParser) -> None: Args: parser (ArgumentParser): parser object to add DataSpaces arguments to. """ - args_str = ' '.join(sys.argv) + ' '.join(sys.argv) parser.add_argument( '--dspaces', action='store_true', diff --git a/psbench/benchmarks/globus_compute_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py index 2adc4eb..6748026 100644 --- a/psbench/benchmarks/globus_compute_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -24,7 +24,6 @@ from psbench import ipfs from psbench.argparse import add_dspaces_options from psbench.argparse import add_globus_compute_options -from psbench.argparse import add_dspaces_options from psbench.argparse import add_ipfs_options from psbench.argparse import add_logging_options from psbench.argparse import add_proxystore_options @@ -187,10 +186,15 @@ def time_task_dspaces( path = str(uuid.uuid4()) data = randbytes(input_size) - local_size = input_size / size + input_size / size start = time.perf_counter_ns() - client.Put(np.array(bytearray(data)), path, version=version, offset=((input_size * rank),)) + client.Put( + np.array(bytearray(data)), + path, + version=version, + offset=((input_size * rank),), + ) fut = gce.submit( pong_dspaces, path, @@ -201,9 +205,9 @@ def time_task_dspaces( result_size=output_size, sleep=task_sleep, ) - + result = fut.result() - + if result is not None: out_path = result[0] out_size = result[1] @@ -325,7 +329,8 @@ def runner( if store is not None and (use_ipfs or use_dspaces): raise ValueError( - f'{"IPFS" if use_ipfs else "DataSpaces"} and ProxyStore cannot be used at the same time.', + f"""{"IPFS" if use_ipfs else "DataSpaces"} and ProxyStore + cannot be used at the same time.""", ) runner_start = time.perf_counter_ns() diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index a10cf70..8f2fa36 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -80,28 +80,27 @@ def pong_dspaces( version: int = 1, result_size: int = 0, sleep: float = 0, -) -> str | None: +) -> tuple[str, int] | None: """Task that takes a DataSpace path and returns data via DataSpaces. Args: client (ds.DSpaces):DataSpaces client path (str): filename of the DataSpaces stored data. - data_size (int) : the size of the DataSpaces object. - rank (int) : MPI rank. - size (int): MPI communication size. - version (int): The version of the data to access (default: 1). + data_size (int) : the size of the DataSpaces object. + rank (int) : MPI rank. + size (int): MPI communication size. + version (int): The version of the data to access (default: 1). result_size (int): size of results byte array (default: 0). sleep (float): seconds to sleep for to simulate work (default: 0). Returns: Filename of return data or None. """ - import os import time import uuid - import numpy as np import dspaces as ds + import numpy as np from psbench.utils import randbytes @@ -122,7 +121,10 @@ def pong_dspaces( filepath = str(uuid.uuid4()) return_data = bytearray(randbytes(result_size)) client.Put( - np.array(return_data), filepath, version=version, offset=((result_size * rank),) + np.array(return_data), + filepath, + version=version, + offset=((result_size * rank),), ) return (filepath, result_size) else: @@ -159,7 +161,6 @@ def pong_proxy( from proxystore.proxy import is_resolved from proxystore.proxy import Proxy from proxystore.store import get_store - from proxystore.store.utils import resolve_async from psbench.tasks.pong import ProxyStats from psbench.utils import randbytes diff --git a/pyproject.toml b/pyproject.toml index e5c7531..a64a7d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,12 +150,6 @@ parametrize-values-type = "tuple" [tool.ruff.flake8-quotes] inline-quotes = "single" -[tool.ruff.flake8-pytest-style] -parametrize-values-type = "tuple" - -[tool.ruff.flake8-quotes] -inline-quotes = "single" - [tool.ruff.isort] force-single-line = true known-first-party = ["psbench", "test", "testing"] diff --git a/tests/benchmarks/globus_compute_tasks/main_test.py b/tests/benchmarks/globus_compute_tasks/main_test.py index 49623ef..e74e4d1 100644 --- a/tests/benchmarks/globus_compute_tasks/main_test.py +++ b/tests/benchmarks/globus_compute_tasks/main_test.py @@ -109,13 +109,18 @@ def test_time_task_proxy() -> None: @pytest.mark.parametrize( - ('use_ipfs', 'use_proxystore', 'log_to_csv'), - ((False, True, False), (True, False, False), (False, False, True)), + ('use_ipfs', 'use_proxystore', 'use_dspaces', 'log_to_csv'), + ( + (False, True, False, False), + (True, False, False, False), + (False, False, False, True), + ), ) def test_runner( caplog, use_ipfs: bool, use_proxystore: bool, + use_dspaces: bool, log_to_csv: bool, tmp_path: pathlib.Path, ) -> None: @@ -145,6 +150,7 @@ def test_runner( runner( globus_compute_endpoint=str(uuid.uuid4()), store=store, + use_dspaces=use_dspaces, use_ipfs=use_ipfs, ipfs_local_dir=str(ipfs_local_dir), ipfs_remote_dir=str(ipfs_remote_dir), @@ -171,6 +177,7 @@ def test_runner_error() -> None: globus_compute_endpoint=str(uuid.uuid4()), store=store, use_ipfs=True, + use_dspaces=False, ipfs_local_dir='/tmp/local/', ipfs_remote_dir='/tmp/remote/', input_sizes=[0],