Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ target/
.venv/
venv/

# Git worktrees
.worktrees/

# macOS
.DS_Store
._*
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ to follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- `Manager.send_action` no longer raises a raw `AttributeError` when a concurrent `disconnect()` clears the connection between the liveness check and the send. It now drops the just-registered request and raises `ManagerSocketError` instead (#3).
- The FastAGI server no longer prints an unhandled traceback to stderr when a client disconnects during the AGI environment handshake. A caller hanging up, Asterisk aborting the leg, or a bare TCP probe raised `AGISIGPIPEHangup` from the handler and printed a full traceback for a routine event. The handler now ends the request quietly. Errors raised by the script handler itself still propagate (#49).
- The FastAGI server sizes its listen backlog without shelling out to `sysctl`. It now requests the maximum backlog and lets the operating system cap it: Unix-like kernels clamp it to the live `somaxconn`, which still tracks a tuned-up limit automatically, and Windows applies the Winsock maximum. This also lifts the platform restriction added in 1.3.0: the server previously raised `NotImplementedError` on any host that was neither Linux nor macOS, and now runs everywhere (#32).
- `Originate_Application` now treats a plain string `data` argument as one application argument instead of splitting it into comma-separated characters (#11).
- `Originate_Application` now rejects bytes-like `data` with `TypeError` instead of serializing byte values as comma-separated integers (#11).
- AMI request building now rejects header keys and values containing CR or LF, including `Action` and `ActionID`, by raising `ValueError`.

## [1.3.0] - 2026-06-24

Expand Down
25 changes: 21 additions & 4 deletions pystrix/ami/ami.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ def send_action(self, request, action_id=None, **kwargs):
Raises `ManagerSocketError` if the socket is broken during transmission, including when a
concurrent `disconnect()` closes the connection before the request is sent.

Raises `ValueError` if any header key or value contains CR or LF.

This function is thread-safe.
"""
if not self.is_connected():
Expand Down Expand Up @@ -1056,18 +1058,33 @@ def build_request(self, action_id, id_generator, **kwargs):
`**kwargs` is a dictionary of additional arguments that may be passed along with the request
at submission time.

Raises `ValueError` if any header key or value contains CR or LF.

The 'Action' line is always first.
"""
items = [(KEY_ACTION, self[KEY_ACTION])]

def validate_header(key, value):
# AMI v2 uses '\r\n' as the field separator and a bare '\r\n' (blank
# line) as the packet terminator. The protocol defines no escaping
# mechanism, so a literal CR or LF inside a key or value is
# indistinguishable from a field boundary or end-of-packet marker.
# See: https://docs.asterisk.org/Configuration/Interfaces/Asterisk-Manager-Interface-AMI/AMI-v2-Specification/
if "\r" in key or "\n" in key:
raise ValueError("AMI header keys must not contain CR or LF")
if "\r" in value or "\n" in value:
raise ValueError("AMI header values must not contain CR or LF")
return (key, value)

items = [validate_header(KEY_ACTION, str(self[KEY_ACTION]))]
for key, value in [
(k, v) for (k, v) in self.items() if k not in (KEY_ACTION, KEY_ACTIONID)
] + list(kwargs.items()):
key = str(key)
if type(value) in (tuple, list, set, frozenset):
for val in value:
items.append((key, str(val)))
items.append(validate_header(key, str(val)))
else:
items.append((key, str(value)))
items.append(validate_header(key, str(value)))

# Resolve the ActionID using the precedence documented above: an explicit
# argument wins, then any value already set on the request, then a generated
Expand All @@ -1076,7 +1093,7 @@ def build_request(self, action_id, id_generator, **kwargs):
if action_id is None:
action_id = self[KEY_ACTIONID] if KEY_ACTIONID in self else id_generator()
action_id = str(action_id)
items.append((KEY_ACTIONID, action_id))
items.append(validate_header(KEY_ACTIONID, action_id))

return (
_EOL.join(
Expand Down
22 changes: 19 additions & 3 deletions pystrix/ami/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,9 +816,10 @@ def __init__(
`channel` is the destination to be called, expressed as a fully qualified Asterisk channel,
like "SIP/test-account@example.org".

`application` is the name of the application to be executed, and `data` is optionally any
parameters to pass to the application, as an ordered sequence (list or tuple) of strings,
escaped as necessary (the ',' character is special).
`application` is the name of the application to be executed, and `data` is optionally a
single string argument or an ordered sequence (list or tuple) of strings to be joined with
commas. The ',' character is special and should be escaped as needed by the caller.
Bytes-like objects passed as `data` raise `TypeError`.

`timeout`, if given, is the number of milliseconds to wait before dropping an unanwsered
call. If set, the request's timeout value will be set to this number + 2 seconds, removing
Expand All @@ -841,7 +842,22 @@ def __init__(
self, channel, timeout, callerid, variables, account, async_
)
self["Application"] = application
if not isinstance(data, str):
try:
data_view = memoryview(data)
except TypeError:
# Non-buffer sequences fall through to normal Data joining below.
pass
else:
data_view.release()
raise TypeError(
"data must be a string or sequence of strings, "
"not a bytes-like object"
)
if data:
# Plain strings skip the buffer probe and need wrapping for join().
if isinstance(data, str):
data = (data,)
self["Data"] = ",".join((str(d) for d in data))


Expand Down
100 changes: 100 additions & 0 deletions tests/test_ami_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""Tests for AMI request building (`pystrix.ami.ami._Request`)."""

import array
import ctypes
import mmap

import pytest

from pystrix.ami.ami import _Request
from pystrix.ami.core import Originate_Application


def _build(request, action_id=None, **kwargs):
Expand Down Expand Up @@ -34,6 +41,99 @@ def test_multi_value_header_expands_to_repeated_lines():
assert "Variable: b=2\r\n" in command


def test_originate_application_treats_string_data_as_one_argument():
request = Originate_Application("SIP/708", "Playback", "goodbye")

command, _ = _build(request)

assert "Data: goodbye\r\n" in command
assert "Data: g,o,o,d,b,y,e\r\n" not in command


def test_originate_application_preserves_sequence_data_arguments():
request = Originate_Application("SIP/708", "Playback", ("goodbye", "noanswer"))

command, _ = _build(request)

assert "Data: goodbye,noanswer\r\n" in command


@pytest.mark.parametrize(
"data_factory",
[
pytest.param(lambda: b"goodbye", id="bytes"),
pytest.param(lambda: b"", id="empty-bytes"),
pytest.param(lambda: bytearray(b"goodbye"), id="bytearray"),
pytest.param(lambda: bytearray(), id="empty-bytearray"),
pytest.param(lambda: memoryview(b"goodbye"), id="memoryview"),
pytest.param(lambda: memoryview(b""), id="empty-memoryview"),
pytest.param(lambda: array.array("b", b"hi"), id="array"),
pytest.param(lambda: (ctypes.c_ubyte * 2)(104, 105), id="ctypes-array"),
pytest.param(lambda: mmap.mmap(-1, 2), id="mmap"),
],
)
def test_originate_application_rejects_binary_data(data_factory):
data = data_factory()
try:
with pytest.raises(
TypeError,
match="data must be a string or sequence of strings, not a bytes-like object",
):
Originate_Application("SIP/708", "Playback", data)
finally:
if isinstance(data, memoryview):
data.release()
close = getattr(data, "close", None)
if close:
close()


def test_originate_application_omits_empty_string_data():
request = Originate_Application("SIP/708", "Playback", "")

command, _ = _build(request)

assert "Data:" not in command


def test_rejects_header_value_containing_crlf():
request = _Request("Originate")
request["Data"] = "goodbye\r\nInjected: x"

with pytest.raises(ValueError, match="AMI header values must not contain CR or LF"):
_build(request)


def test_rejects_action_value_containing_crlf():
request = _Request("Ping\r\nInjected: x")

with pytest.raises(ValueError, match="AMI header values must not contain CR or LF"):
_build(request)


def test_rejects_header_key_containing_crlf():
request = _Request("Originate")
request["Data\r\nInjected"] = "goodbye"

with pytest.raises(ValueError, match="AMI header keys must not contain CR or LF"):
_build(request)


def test_rejects_multi_value_header_value_containing_crlf():
request = _Request("Originate")
request["Variable"] = ("a=1\r\nInjected: x", "b=2")

with pytest.raises(ValueError, match="AMI header values must not contain CR or LF"):
_build(request)


def test_rejects_action_id_containing_crlf():
request = _Request("Ping")

with pytest.raises(ValueError, match="AMI header values must not contain CR or LF"):
_build(request, action_id="safe\r\nInjected: x")


def test_multi_value_order_preserved_and_blank_line_terminator():
request = _Request("Originate")
request["Variable"] = ("a=1", "b=2")
Expand Down
Loading