Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 0 additions & 119 deletions examples/lowlevel_api.py

This file was deleted.

33 changes: 33 additions & 0 deletions examples/simple_async_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

import ezmsg.core as ez

TOPIC = "/TEST"


async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
async with ez.GraphContext((host, port), auto_start=True) as ctx:
pub = await ctx.publisher(TOPIC)
try:
print("Publisher Task Launched")
count = 0
while True:
await pub.broadcast(f"{count=}")
await asyncio.sleep(0.1)
count += 1
except asyncio.CancelledError:
pass
finally:
print("Publisher Task Concluded")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")

args = parser.parse_args()

asyncio.run(main(host=args.host, port=args.port))
35 changes: 35 additions & 0 deletions examples/simple_async_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

import ezmsg.core as ez

PORT = 12345
TOPIC = "/TEST"


async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
async with ez.GraphContext((host, port), auto_start=True) as ctx:
sub = await ctx.subscriber(TOPIC)
try:
print("Subscriber Task Launched")
while True:
async with sub.recv_zero_copy() as msg:
# Uncomment if you want to witness backpressure!
# await asyncio.sleep(1.0)
print(msg)
except asyncio.CancelledError:
pass
finally:
print("Subscriber Task Concluded")
print("Detached")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")

args = parser.parse_args()

asyncio.run(main(host=args.host, port=args.port))
35 changes: 35 additions & 0 deletions examples/simple_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import time

import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
with ez.sync.init((host, port), auto_start=True) as ctx:
pub = ctx.create_publisher(TOPIC, force_tcp=True)

print("Publisher Task Launched")
count = 0
try:
while True:
output = f"{count=}"
pub.publish(output)
print(output)
time.sleep(0.1)
count += 1
except KeyboardInterrupt:
pass
print("Publisher Task Concluded")

print("Done")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
args = parser.parse_args()

main(host=args.host, port=args.port)
30 changes: 30 additions & 0 deletions examples/simple_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time
import ezmsg.core as ez

TOPIC = "/TEST"


def main(host: str = "127.0.0.1", port: int = 12345) -> None:
with ez.sync.init((host, port), auto_start=True) as ctx:
print("Subscriber Task Launched")

def on_message(msg: str) -> None:
# Uncomment if you want to witness backpressure!
# time.sleep(1.0)
print(msg)

ctx.create_subscription(TOPIC, callback=on_message)
ez.sync.spin(ctx)

print("Subscriber Task Concluded")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
args = parser.parse_args()

main(host=args.host, port=args.port)
6 changes: 6 additions & 0 deletions src/ezmsg/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"NormalTermination",
"GraphServer",
"GraphContext",
"sync",
"SyncContext",
"SyncPublisher",
"SyncSubscriber",
"run_command",
"Publisher",
"Subscriber",
Expand All @@ -45,6 +49,8 @@
from .backendprocess import Complete, NormalTermination
from .graphserver import GraphServer
from .graphcontext import GraphContext
from . import sync
from .sync import SyncContext, SyncPublisher, SyncSubscriber
from .command import run_command
from .pubclient import Publisher
from .subclient import Subscriber
Expand Down
10 changes: 9 additions & 1 deletion src/ezmsg/core/graphcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class GraphContext:

:param graph_service: Optional graph service instance to use
:type graph_service: GraphService | None
:param auto_start: Whether to auto-start a GraphServer if connection fails.
If None, defaults to auto-start only when graph_address is not provided
and no environment override is set.
:type auto_start: bool | None

.. note::
The GraphContext is typically managed automatically by the ezmsg runtime
Expand All @@ -40,11 +44,13 @@ class GraphContext:
def __init__(
self,
graph_address: AddressType | None = None,
auto_start: bool | None = None,
) -> None:
self._clients = set()
self._edges = set()
self._graph_address = graph_address
self._graph_server = None
self._auto_start = auto_start

@property
def graph_address(self) -> AddressType | None:
Expand Down Expand Up @@ -130,7 +136,9 @@ async def resume(self) -> None:
await GraphService(self.graph_address).resume()

async def _ensure_servers(self) -> None:
self._graph_server = await GraphService(self.graph_address).ensure()
self._graph_server = await GraphService(self.graph_address).ensure(
auto_start=self._auto_start
)

async def _shutdown_servers(self) -> None:
if self._graph_server is not None:
Expand Down
10 changes: 7 additions & 3 deletions src/ezmsg/core/graphserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def start(self, address: AddressType | None = None) -> None: # type: ignore[ove
self._loop = asyncio.new_event_loop()
super().start()
self._server_up.wait()
logger.info(f'Started GraphServer at {address}')

def stop(self) -> None:
self._shutdown.set()
Expand Down Expand Up @@ -453,15 +454,18 @@ def create_server(self) -> GraphServer:
self._address = server.address
return server

async def ensure(self) -> GraphServer | None:
async def ensure(self, auto_start: bool | None = None) -> GraphServer | None:
"""
Try connecting to an existing server. If none is listening and no explicit
address/environment is set, start one and return it. If an existing one is
found, return None.
found, return None. If auto_start is provided, it overrides the default
behavior.
"""
server = None
ensure_server = False
if self._address is None:
if auto_start is not None:
ensure_server = auto_start
elif self._address is None:
# Only auto-start if env var not forcing a location
ensure_server = self.ADDR_ENV not in os.environ

Expand Down
Loading