A PHP library implementing the RabbitMQ Streams Protocol client.
It provides low-level TCP communication with a RabbitMQ broker over the native Stream protocol (port 5552), including binary frame serialization/deserialization.
PHP 8.1+
RabbitMQ with the rabbitmq_stream plugin enabled
composer require crazy-goat/rabbit-stream
use CrazyGoat \RabbitStream \Client \Connection ;
$ connection = Connection::create (host: 'localhost ' , port: 5552 );
$ producer = $ connection ->createProducer ('my-stream ' , name: 'my-producer ' );
$ producer ->send ('hello world ' );
$ producer ->waitForConfirms (timeout: 5 );
$ producer ->close ();
$ connection ->close ();
use CrazyGoat \RabbitStream \Client \Connection ;
use CrazyGoat \RabbitStream \VO \OffsetSpec ;
$ connection = Connection::create (host: 'localhost ' , port: 5552 );
$ consumer = $ connection ->createConsumer ('my-stream ' , offset: OffsetSpec::first ());
while ($ messages = $ consumer ->read (timeout: 5 )) {
foreach ($ messages as $ msg ) {
echo $ msg ->getBody () . "\n" ;
}
}
$ consumer ->close ();
$ connection ->close ();
High-level API (Recommended)
use CrazyGoat \RabbitStream \Client \Connection ;
use CrazyGoat \RabbitStream \Client \ConfirmationStatus ;
// Connect (handshake and authentication handled automatically)
$ connection = Connection::create (
host: '127.0.0.1 ' ,
user: 'guest ' ,
password: 'guest '
);
// Create a producer for 'my-stream'
$ producer = $ connection ->createProducer (
stream: 'my-stream ' ,
onConfirm: function (ConfirmationStatus $ status ): void {
if ($ status ->isConfirmed ()) {
echo "Message {$ status ->getPublishingId ()} confirmed \n" ;
}
}
);
// Send a message
$ producer ->send ("Hello, RabbitMQ Stream! " );
// Drive the loop to receive confirmations (optional, blocking)
$ connection ->readLoop (maxFrames: 1 );
// Close producer and connection
$ producer ->close ();
$ connection ->close ();
Consuming with Message Decoding
use CrazyGoat \RabbitStream \Client \AmqpMessageDecoder ;
use CrazyGoat \RabbitStream \Client \OsirisChunkParser ;
// ... subscribe to stream and receive Deliver response
$ chunk = $ deliverResponse ->getChunk ();
$ entries = OsirisChunkParser::parse ($ chunk );
// Decode AMQP 1.0 messages into Message objects
$ messages = AmqpMessageDecoder::decodeAll ($ entries );
foreach ($ messages as $ message ) {
echo "Offset: {$ message ->getOffset ()}\n" ;
echo "Body: {$ message ->getBody ()}\n" ;
echo "Content-Type: {$ message ->getContentType ()}\n" ;
echo "Message-ID: {$ message ->getMessageId ()}\n" ;
}
use CrazyGoat \RabbitStream \StreamConnection ;
use CrazyGoat \RabbitStream \Request \PeerPropertiesRequestV1 ;
. ..
See examples/simple_publisher.php for a full working example.
Protocol Implementation Status
Protocol reference: https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc
Connection & Authentication
Command
Key
Request
Response
PeerProperties
0x0011
✅
✅
SaslHandshake
0x0012
✅
✅
SaslAuthenticate
0x0013
✅
✅
Tune
0x0014
✅
✅
Open
0x0015
✅
✅
Command
Key
Request
Response
DeclarePublisher
0x0001
✅
✅
Publish
0x0002
✅
—
PublishConfirm
0x0003
—
✅
PublishError
0x0004
—
✅
QueryPublisherSequence
0x0005
✅
✅
DeletePublisher
0x0006
✅
✅
Command
Key
Request
Response
Subscribe
0x0007
✅
✅
Deliver
0x0008
—
✅
Credit
0x0009
✅
✅
StoreOffset
0x000a
✅
—
QueryOffset
0x000b
✅
✅
Unsubscribe
0x000c
✅
✅
ConsumerUpdate
0x001a
✅
✅
Command
Key
Request
Response
Create
0x000d
✅
✅
Delete
0x000e
✅
✅
Metadata
0x000f
✅
✅
MetadataUpdate
0x0010
—
✅
CreateSuperStream
0x001d
✅
✅
DeleteSuperStream
0x001e
✅
✅
StreamStats
0x001c
✅
✅
Command
Key
Request
Response
Route
0x0018
✅
✅
Partitions
0x0019
✅
✅
Command
Key
Request
Response
Close
0x0016
✅
✅
Heartbeat
0x0017
✅
—
ExchangeCommandVersions
0x001b
✅
✅
ResolveOffsetSpec
0x001f
✅
✅
Legend: ✅ implemented, ❌ not implemented, — not applicable (one-direction command)