chickadee » amqp

Module amqp

Description

A low-level Chicken Scheme client for the Advanced Message Queueing Protocol v0.9.1.

Limitations

Authors

Fredrik Appelberg (fredrik@appelberg.me)

Repository

https://github.com/fred-o/chicken-amqp

Requirements

Examples

Producer

;; Establish a connection and open a channel
(define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost"))
(define chan (amqp-channel-open conn))

;; Declare a durable topic exchange
(amqp-exchange-declare chan "my-exchange" "topic" durable: 1)

;; Publish a message with a plain text payload
(amqp-publish-message chan "my-exchange" "my-routing-key" "hello, world" '((content-type "text/plain")))

Consumer

;; Establish a connection and open a channel
(define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost"))
(define chan (amqp-channel-open conn))

;; Declare the previously created exchange passively. Not strictly
;; neccessary, but good practice.
(amqp-exchange-declare chan "my-exchange" passive: 1)

;; Declare an anymous queue. The server will assign a random name, so
;; we need to check the return data to get it. 
(let ((q (alist-ref 'queue (amqp-declare-queue chan "" auto-delete: 1))))
  ;; Bind the queue to our exchange
  (amqp-queue-bind chan q "my-exchange" "#")

  ;; Tell the server that we want to start consuming messages
  (amqp-basic-consume chan q)

  ;; Loop forever, recieving messages and printing the payload
  (let loop ()
    (let ((msg (amqp-receive-message chan))
      (print (blob->string (amqp-message-payload msg)))
      (loop)))))

API

Concurrency

Each AMQP connection starts two SRFI-18 threads; one for reading and dispatching incoming frames, and one for handling heartbeats.

All operations use SRFI-18 mutexes to ensure thread safety, and it should be perfectly fine to have multiple threads accessing the same channel. The mailbox egg is used internally when data needs to be passed between threads.

Conditions

AMQP handles errors by simply closing down the offending channel and/or connection. When this happens an amqp condition with reply-text and reply-code properties is raised, and the channel/connection object becomes invalid.

Connections

amqp-connect URLprocedure

Create a new AMQP connection and returns a connection object. The url parameter should have the format amqp://[user][:password@][host]/[vhost]

amqp-disconnect CONNECTIONprocedure

Close an AMQP connection.

amqp-debugconstant
default
(make-parameter #f)

Set amqp-debug to #t to enable detailed debug logging of sent and received frames.

Receiving messages

amqp-receive-message CHANNELprocedure

Block until the next amqp-message can be read on the given CHANNEL.

The client will receive messages from any of the following AMQP methods:

  • basic.deliver (after basic.consume on the same channel)
  • basic.return (for messages sent with mandatory: 1 or immediate: 1 that could not be routed or delivered immediately by the server)
  • basic.get-ok (in response to basic.get)
amqp-messagerecord
constructor
(make-amqp-message DELIVERY PROPERTIES PAYLOAD)
predicate
amqp-message?
implementation
define-record
field getter setter
delivery amqp-message-delivery amqp-message-delivery-set!
properties amqp-message-properties amqp-message-properties-set!
payload amqp-message-payload amqp-message-payload-set!

Record that holds received messages. The delivery slot holds an alist of delivery information from the server. The properties slot is an alist of AMQP message properties.

amqp-payload-conversionconstant
default
(make-parameter bitstring->blob)

The ampq egg uses bitstring to handle binary data internally, but in an attempt to avoid abstraction leakage message payloads returned by amqp-receive-message are converted to standard blobs. If you don't want this behaviour, set the parameter amqp-payload-conversion to either #f or a function that accepts a bitstring and returns the desired payload format.

Publishing messages

amqp-publish-message CHANNEL EXCHANGE ROUTING-KEY PAYLOAD PROPERTIES #!key (MANDATORY 0) (IMMEDIATE 0)procedure

Publish a message. The PAYLOAD can be an u8vector, string, vector or blob. PROPERTIES should be an alist of AMQP message properties.

AMQP Commands

The AMQP command functions map 1:1 to the commands specified by the AMQP specification.

The Channel Class
amqp-channel-open CONNECTIONprocedure

Returns a new channel object that is then passed as an argument to the other functions.

amqp-channel-flow CHANNEL ACTIVEprocedure
amqp-channel-close CHANNEL #!key (REPLY-CODE 0) (REPLY-TEXT ) (METHOD-ID 0) (CLASS-ID 0)procedure
amqp-exchange-declare CHANNEL EXCHANGE TYPE #!key (PASSIVE 0) (DURABLE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
The Exchange Class
amqp-exchange-delete CHANNEL EXCHANGE #!key (IF-UNUSED 0) (NO-WAIT 0)procedure
amqp-queue-declare CHANNEL QUEUE #!key (PASSIVE 0) (DURABLE 0) (EXCLUSIVE 0) (AUTO-DELETE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
The Queue Class
amqp-queue-bind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (NO-WAIT 0) (ARGUMENTS '())procedure
amqp-queue-purge CHANNEL QUEUE #!key (NO-WAIT 0)procedure
amqp-queue-delete CHANNEL QUEUE #!key (IF-UNUSED 0) (IF-EMPTY 0) (NO-WAIT 0)procedure
amqp-queue-unbind CHANNEL QUEUE EXCHANGE ROUTING-KEY #!key (ARGUMENTS '())procedure
amqp-basic-qos CHANNEL PREFETCH-SIZE PREFETCH-COUNT GLOBALprocedure
The Basic Class
amqp-basic-consume CHANNEL QUEUE #!key (CONSUMER-TAG ) (NO-LOCAL 0) (NO-ACK 0) (EXCLUSIVE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
amqp-basic-cancel CHANNEL CONSUMER-TAG #!key (NO-WAIT 0)procedure
amqp-basic-get CHANNEL QUEUE #!key (NO-ACK 0)procedure
amqp-basic-ack CHANNEL DELIVERY-TAG #!key (MULTIPLE 0)procedure
amqp-basic-reject CHANNEL DELIVERY-TAG #!key (REQUEUE 0)procedure
amqp-basic-recover CHANNEL REQUEUEprocedure
amqp-basic-recover-async CHANNEL REQUEUEprocedure
amqp-tx-select CHANNELprocedure
The Transaction Class
amqp-tx-commit CHANNELprocedure
amqp-tx-rollback CHANNELprocedure

License

BSD

Version History

Contents »