Messaging

The messaging module is a backend-agnostic queue + topic surface. Application code uses one API across brokers; the chosen backend handles the underlying protocol (AWS SQS, RabbitMQ AMQP, ActiveMQ STOMP, Kafka).

Connecting

messaging.connect(opts) returns a queue handle. The driver key selects the backend; remaining keys are validated by that backend.

import messaging;

let q = messaging.connect({
    "driver":    "sqs",
    "region":    "us-east-1",
    "queueUrl":  "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
    "accessKey": env.get("AWS_ACCESS_KEY_ID"),
    "secretKey": env.get("AWS_SECRET_ACCESS_KEY")
});

Publishing

publish(payload) accepts a string, bytes, or any value that JSON can serialise. Non-string payloads are JSON-encoded before being sent so the consumer can json.parse them back.

q.publish("ping");
q.publish({"orderId": 42, "amount": 100});

Receiving

receive(timeoutMs) blocks up to the timeout. Returns the message dict or null when the window expires with no message. Each dict carries:

Key Type Notes
body string Raw payload. JSON dicts come back as encoded JSON; parse client-side.
id string Broker-side identifier (informational; not needed for ack).
handle string The receipt handle - pass to ack() to delete.
let msg = q.receive(20000);   /* up to 20s long-poll */
if (msg != null) {
    io.println(msg["body"]);
    q.ack(msg["handle"]);
}

ack() accepts either the handle string or the whole message dict.

Consuming continuously

consume(handler) blocks in a receive loop, dispatching every delivered message to the supplied callable and acknowledging on clean return:

q.consume(func(any msg): void {
    let order = json.parse(msg["body"] as string);
    process(order);
});

The callable runs synchronously inside the consume loop; spawn a task with async.run if you need parallel processing.

Topics (pub/sub)

A queue delivers each message to exactly one consumer. A topic broadcasts each published message to every active subscriber. The two surfaces share the same option keys; the difference is how many subscribers receive a given message.

messaging.topic(opts) returns a topic handle:

let topic = messaging.topic({
    "driver": "rabbitmq",
    "url":    "amqp://localhost/",
    "topic":  "events.user.signup"
});

topic.publish({"userId": 42});

topic.subscribe(func(any msg): void {
    io.println("received: " + (msg["body"] as string));
});

Backends differ in how they implement fan-out:

Driver Fan-out mechanism
rabbitmq Fanout exchange named after topic. Each subscribe() declares a server-named, exclusive, auto-delete queue bound to the exchange.
stomp / activemq /topic/<name> destination namespace; the broker fans out at the destination. Pass destination like a queue would; a bare name is auto-prefixed with /topic/.
kafka Single topic; each subscribe() opens its own consumer group so every subscriber sees every record.
sns AWS SNS Publish for the producer side; subscribers pair an SQS queue with the topic (subscription set up out of band) and poll it.
sqs Not supported. SQS itself does not fan out; AWS SNS is the proper pub/sub primitive and can target SQS subscriptions. Calling messaging.topic({"driver":"sqs",...}) throws with this guidance.

subscribe(handler) blocks for the lifetime of the topic handle. Spawn it on its own task if you want to keep publishing concurrently:

import async;

async.run(func(): void {
    topic.subscribe(func(any msg): void { handleEvent(msg); });
});

topic.publish({"event": "tick"});

close() releases broker-side resources (channel + connection for RabbitMQ, socket for STOMP, writer for Kafka).

RabbitMQ specifics

The RabbitMQ backend speaks AMQP 0.9.1 over TCP. Connect with an amqp:// URL; the backend opens one connection and one channel per queue handle and declares the queue durably on construction.

let q = messaging.connect({
    "driver":     "rabbitmq",
    "url":        "amqp://guest:guest@localhost:5672/",
    "queue":      "orders",
    "exchange":   "",           # default: direct-to-queue
    "routingKey": "orders",     # default: queue name
    "durable":    true          # default
});

Required options:

Key Notes
driver "rabbitmq"
url Standard AMQP URL (user / password / host / port / vhost).
queue Queue name. Declared on connect with the durable flag.

Optional:

Key Default Notes
exchange "" Empty string = default exchange (direct-to-queue routing).
routingKey queue name Routing key for publishes.
durable true Queue survives broker restart.

receive(timeoutMs) polls with AMQP basic.get. The poll cadence inside the timeout window is 50 ms, so timeoutMs=200 makes up to four broker round-trips before returning null. Use a smaller value for a tight latency budget; use the long-polling receive(20000) pattern from SQS instead if you want broker-side waiting.

The lower-level amqp module (amqp.dial, amqp.channel, amqp.declareQueue, amqp.publish, amqp.get, amqp.ack, amqp.close) is available for cases the MessageQueue facade doesn't cover (custom exchange topologies, QoS prefetch, multiple consumers per channel).

STOMP specifics (ActiveMQ + RabbitMQ-via-plugin)

STOMP 1.2 is a small text-based protocol supported natively by ActiveMQ, ActiveMQ Artemis, and several other brokers, and available on RabbitMQ via the rabbitmq_stomp plugin. The driver name "stomp" and the alias "activemq" both select this backend.

let q = messaging.connect({
    "driver":      "stomp",
    "host":        "broker.internal",
    "port":        61613,
    "destination": "/queue/orders",
    "login":       env.get("STOMP_USER"),
    "passcode":    env.get("STOMP_PASS")
});

Required options:

Key Notes
driver "stomp" or "activemq"
host Broker host.
port Broker port (typically 61613 for plain, 61614 for TLS).
destination Queue or topic name. ActiveMQ uses /queue/foo / /topic/foo; RabbitMQ STOMP uses /queue/foo / /exchange/foo.

Optional:

Key Default Notes
login "" Broker username.
passcode "" Broker password.
virtualHost "/" STOMP host header.
ackMode "client-individual" STOMP subscription ack mode.

The driver opens one TCP connection, sends CONNECT, expects CONNECTED, then sends one SUBSCRIBE against the destination. Subsequent receive() calls block reading the next MESSAGE frame on that subscription.

publish() uses the standard STOMP SEND frame with explicit content-length for binary-safe payloads. ack() sends an ACK frame referencing the subscription's ack id (preferred over message-id per STOMP 1.2). close() sends DISCONNECT and closes the socket.

Kafka specifics

The Kafka backend speaks the native Kafka protocol via the underlying segmentio/kafka-go client. The MessageQueue facade maps onto a single topic with one consumer-group member per handle: publish() produces records, receive() fetches the next record assigned to this group, and ack() commits the offset for the last fetched record.

let q = messaging.connect({
    "driver":  "kafka",
    "brokers": ["kafka-0:9092", "kafka-1:9092"],
    "topic":   "orders",
    "groupId": "order-processor"
});

Required options:

Key Notes
driver "kafka"
brokers list of host:port bootstrap brokers.
topic Topic name.
groupId Consumer-group id used by receive / ack.

Optional:

Key Default Notes
autoCreateTopic false When true, the writer asks the broker to create the topic on first publish.

receive(timeoutMs) calls FetchMessage with that deadline. Unlike the queue brokers, Kafka delivers records at the consumer's pace - fetching does not remove anything from the log. ack() commits the offset of the last fetched record to the group's committed-offset store; restarting the consumer with the same groupId resumes after the last committed offset.

For lower-level access (manual partition assignment, custom balancers, headers), the kafka native module is exposed: kafka.writer, kafka.write, kafka.reader, kafka.read, kafka.commit, kafka.close.

SQS specifics

The SQS backend speaks the REST API (query-string form) signed with AWS Signature V4. No long-lived connection: each call is an independent HTTPS request. receive(timeoutMs) maps to the SQS WaitTimeSeconds parameter rounded up to whole seconds, capped at the SQS-side maximum of 20.

Required options:

Key Notes
driver "sqs"
region AWS region of the queue (used in the credential scope).
queueUrl Full URL returned by SQS at queue creation.
accessKey IAM access key ID. Read from env in production.
secretKey IAM secret access key. Read from env in production.

The implementation is in stdlib/messaging/sqs.gb and can be inspected as a reference for adding new backends.

SNS specifics

The SNS backend handles pub/sub on AWS. publish() signs each request with sigv4 and POSTs to the regional SNS endpoint (https://sns.<region>.amazonaws.com/). subscribe() polls a paired SQS queue: the SNS->SQS subscription is set up out of band (aws sns subscribe ... --protocol sqs --endpoint <queue-arn>), then subscribe(handler) drives the SQS consume loop and forwards each delivered notification to the handler.

let topic = messaging.topic({
    "driver":    "sns",
    "region":    "us-east-1",
    "topicArn":  "arn:aws:sns:us-east-1:123:orders",
    "queueUrl":  "https://sqs.us-east-1.amazonaws.com/123/orders-sub",
    "accessKey": env.get("AWS_ACCESS_KEY_ID"),
    "secretKey": env.get("AWS_SECRET_ACCESS_KEY")
});
topic.publish({"orderId": 42});
topic.subscribe(func(any msg): void {
    let payload = msg["body"] as string;
    /* payload is the raw SNS notification JSON; if you want just
     * the original Message field, parse and extract:
     * json.parse(payload)["Message"]. */
});

Required options:

Key Notes
driver "sns"
region AWS region of the topic.
topicArn Full ARN of the SNS topic.
accessKey IAM access key ID with sns:Publish (and sqs:ReceiveMessage on the subscription queue if subscribing).
secretKey IAM secret access key.
queueUrl Optional. SQS queue subscribed to the topic. Required to call subscribe(); omit if you only publish.

Publish-only setups can skip queueUrl; calling subscribe() without one throws with a clear error. The implementation is in stdlib/messaging/sns.gb.

Adding a backend

Implement a class with the same method set as the SQS backend (the messaging.MessageQueue interface), register it under a new driver name in messaging.gb, and write tests under tests/stdlib/messaging_*_test.gb. The interface is small by design - one method per logical operation, no broker-specific features leak into the facade.