Gopherate is a framework for building distributed systems using ZeroMQ. It provides a set of tools and abstractions to create producers, consumers, and brokers that communicate using messages.
- Producers and Consumers: Easily create producers and consumers that can send and receive messages.
- Broker: A broker that routes messages between producers and consumers.
- FSM (Finite State Machine): A flexible FSM implementation to manage the state of producers and consumers.
- Message Handling: Define and interpret messages with ease.
- Error and Log Channels: Centralized error and log handling.
As specified in ZMQ implementation this package depends on:
zmq4 is just a wrapper for the ZeroMQ library. It doesn't include the library itself.
Check the original 0MQ documentation and the libzmq repository.
On Ubuntu-like systems, run the following:
sudo apt install pkg-config libczmq-devYou should now be able to check the installation:
$> pkg-config --modversion libzmq
4.3.4
$> apt list --installed | grep zmq
WARNING: apt does not have a stable CLI interface. Use with caution in scripts.
libczmq-dev/jammy,now 4.2.1-1 amd64 [installed]
libczmq4/jammy,now 4.2.1-1 amd64 [installed,automatic]
libzmq3-dev/jammy,now 4.3.4-2 amd64 [installed,automatic]
libzmq5/jammy,now 4.3.4-2 amd64 [installed,automatic]To install Gopherate, use go get:
go get github.com/Pigotz/gopherateFor a complete example, see the E2E tests implemented here.
In case you want to run the standalone Broker, find its implementation here.
ctx := context.Background()
broker, err := broker.NewBroker("tcp://*:5555", nil)
if err != nil {
panic(err)
}
defer broker.Close()
err = broker.Bind()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
broker.Run(ctx, nil, nil)Define the Fibonacci task:
type ComputeFibonacciTask struct {
steps int
}
func (w *ComputeFibonacciTask) Function() string {
return "fibonacci"
}
func (w *ComputeFibonacciTask) Args() []string {
return []string{strconv.Itoa(w.steps)}
}Launch the producer:
ctx := context.Background()
producer, err := producer.NewProducer("producer-ID", "tcp://localhost:5555", nil)
if err != nil {
panic(err)
}
defer producer.Close()
err = producer.Connect()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
producer.Run(ctx, nil, nil)Submit the Fibonacci task to the network of consumers:
computeFibonacciTask := &ComputeFibonacciTask{
steps: 100,
}
results, err := producer.Process(ctx, computeFibonacciTask, 5*time.Second)
if err != nil {
panic(err)
}
// In this specific example, the result is a single string
fmt.Printf("Fibonacci result: %s\n", results[0])The Fibonacci implementation:
func fibonacciHandler(args []string) ([]string, []error) {
if len(args) != 1 {
return nil, []error{errors.New("expected 1 argument")}
}
steps, err := strconv.Atoi(args[0])
if err != nil {
return nil, []error{err}
}
a, b := 0, 1
for i := 0; i < steps; i++ {
a, b = b, a+b
}
return []string{strconv.Itoa(a)}, nil
}The main code:
ctx := context.Background()
consumer, err := consumer.NewConsumer("consumer-ID", "tcp://localhost:5555", nil, consumer.Handlers{
"fibonacci": fibonacciHandler,
})
if err != nil {
panic(err)
}
defer consumer.Close()
err = consumer.Connect()
if err != nil {
panic(err)
}
// You should spawn this in a goroutine
// The nil arguments are the error and log channels - see more below
consumer.Run(ctx, nil, nil)You can pass error and log channels to the broker, producer, and consumer. This way, you can centralize the error and log handling.
Print functions:
func PrintErrors(ctx context.Context, errors chan error) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case err := <-errors:
fmt.Printf("[%s] [ERROR] %s\n", time.Now().UTC().String(), err)
}
}
}
func PrintLogs(ctx context.Context, logs chan string) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case log := <-logs:
fmt.Printf("[%s] [LOG] %s\n", time.Now().UTC().String(), log)
}
}
}Passing the channels:
var waitGroup sync.WaitGroup
// Buffer is necessary to prevent blocking
errors := make(chan error, 100)
defer close(errors)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
PrintErrors(ctx, errors)
}()
// Buffer is necessary to prevent blocking
logs := make(chan string, 100)
defer close(logs)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
PrintLogs(ctx, logs)
}()
// broker.Run(ctx, errors, logs)
// producer.Run(ctx, errors, logs)
// consumer.Run(ctx, errors, logs)
waitGroup.Wait()You can prefix the errors and logs with the utility functions you fan find in the channels package.
// Example usage
// `errors` and `logs` have already been defined before
var waitGroup sync.WaitGroup
brokerErrors := make(chan error, 100)
defer close(brokerErrors)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
channels.WrapErrorChannel(ctx, "[BROKER]", brokerErrors, errors)
}()
brokerLogs := make(chan string, 100)
defer close(brokerLogs)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
channels.PrefixStringChannel(ctx, "[BROKER]", brokerLogs, logs)
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
broker.Run(ctx, brokerErrors, brokerLogs)
}()
waitGroup.Wait()Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
To run the tests, use the following command:
go test ./...- Pigotz - Pigotz
This project is licensed under the MIT License - see the LICENSE file for details.
