A low-level Chicken Scheme client for the Advanced Message Queueing Protocol v0.9.1.
- Only AMQP 0.9.1 is supported, and 1.0 support is unlikely to be added.
- Only PLAIN authentication is supported.
- No SSL support.
Fredrik Appelberg (firstname.lastname@example.org)
;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare a durable topic exchange (amqp-exchange-declare chan "my-exchange" "topic" durable: 1) ;; Publish a message with a plain text payload (amqp-publish-message chan "my-exchange" "my-routing-key" "hello, world" '((content-type "text/plain")))
;; Establish a connection and open a channel (define conn (amqp-connect "amqp://myuser:mypassword@myserver/myvhost")) (define chan (amqp-channel-open conn)) ;; Declare the previously created exchange passively. Not strictly ;; neccessary, but good practice. (amqp-exchange-declare chan "my-exchange" passive: 1) ;; Declare an anymous queue. The server will assign a random name, so ;; we need to check the return data to get it. (let ((q (alist-ref 'queue (amqp-declare-queue chan "" auto-delete: 1)))) ;; Bind the queue to our exchange (amqp-queue-bind chan q "my-exchange" "#") ;; Tell the server that we want to start consuming messages (amqp-basic-consume chan q) ;; Loop forever, recieving messages and printing the payload (let loop () (let ((msg (amqp-receive-message chan)) (print (blob->string (amqp-message-payload msg))) (loop)))))
Each AMQP connection starts two SRFI-18 threads; one for reading and dispatching incoming frames, and one for handling heartbeats.
All operations use SRFI-18 mutexes to ensure thread safety, and it should be perfectly fine to have multiple threads accessing the same channel. The mailbox egg is used internally when data needs to be passed between threads.
AMQP handles errors by simply closing down the offending channel and/or connection. When this happens an amqp condition with reply-text and reply-code properties is raised, and the channel/connection object becomes invalid.
- amqp-connect URLprocedure
Create a new AMQP connection and returns a connection object. The url parameter should have the format amqp://[user][:password@][host]/[vhost]
- amqp-disconnect CONNECTIONprocedure
Close an AMQP connection.
- (make-parameter #f)
Set amqp-debug to #t to enable detailed debug logging of sent and received frames.
- amqp-receive-message CHANNELprocedure
Block until the next amqp-message can be read on the given CHANNEL.
The client will receive messages from any of the following AMQP methods:
- basic.deliver (after basic.consume on the same channel)
- basic.return (for messages sent with mandatory: 1 or immediate: 1 that could not be routed or delivered immediately by the server)
- basic.get-ok (in response to basic.get)
- (make-amqp-message DELIVERY PROPERTIES PAYLOAD)
field getter setter delivery amqp-message-delivery amqp-message-delivery-set! properties amqp-message-properties amqp-message-properties-set! payload amqp-message-payload amqp-message-payload-set!
Record that holds received messages. The delivery slot holds an alist of delivery information from the server. The properties slot is an alist of AMQP message properties.
- (make-parameter bitstring->blob)
The ampq egg uses bitstring to handle binary data internally, but in an attempt to avoid abstraction leakage message payloads returned by amqp-receive-message are converted to standard blobs. If you don't want this behaviour, set the parameter amqp-payload-conversion to either #f or a function that accepts a bitstring and returns the desired payload format.
- amqp-publish-message CHANNEL EXCHANGE ROUTING-KEY PAYLOAD PROPERTIES #!key (MANDATORY 0) (IMMEDIATE 0)procedure
Publish a message. The PAYLOAD can be an u8vector, string, vector or blob. PROPERTIES should be an alist of AMQP message properties.
The AMQP command functions map 1:1 to the commands specified by the AMQP specification.
- The first argument to any command function except amqp-channel-open should be a channel object. For other arguments, consult the AMQP specification.
- All synchronous command functions except amqp-channel-open return an alist of values sent by the server in the reply command. Consult the AMQP specification for details.
- Asynchronous command functions (amqp-basic-ack, amqp-basic-reject, etc.) or synchronous functions called with no-wait: 1 return (void).
- amqp-channel-open returns a new channel object.
- amqp-channel-open CONNECTIONprocedure
Returns a new channel object that is then passed as an argument to the other functions.
- amqp-channel-flow CHANNEL ACTIVEprocedure
- amqp-exchange-declare CHANNEL EXCHANGE TYPE #!key (PASSIVE 0) (DURABLE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
- amqp-queue-declare CHANNEL QUEUE #!key (PASSIVE 0) (DURABLE 0) (EXCLUSIVE 0) (AUTO-DELETE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
- amqp-basic-consume CHANNEL QUEUE #!key (CONSUMER-TAG ) (NO-LOCAL 0) (NO-ACK 0) (EXCLUSIVE 0) (NO-WAIT 0) (ARGUMENTS '())procedure
- amqp-basic-recover CHANNEL REQUEUEprocedure
- amqp-tx-select CHANNELprocedure
- amqp-tx-commit CHANNELprocedure
- amqp-tx-rollback CHANNELprocedure
- 1.0.0 - First public release
- 0.9 - Preparing for an 1.0 release