RabbitMQ essentials with Go examples

February 20, 2018

What is RabbitMQ?



RabbitMQ is an Erlang-based implementation of AMQP (Advanced Message Queuing Protocol) which is an open standard that defines a protocol for systems to exchange messages. RabbitMQ provides support for the STOMP, MQTT, and HTTP protocols by the means of RabbitMQ plug-ins.

Ok, simply speaking it’s a message broker (it accepts and forwards messages).

Core Concepts



  • Publishers (Producers) create messages and publish (send) them to a broker server (RabbitMQ).

  • message has two parts: a payload and a label. The payload can be anything from a JSON array to an MPEG-4. RabbitMQ doesn’t care. The label is more interesting. It describes the payload, and is how RabbitMQ will determine who should get a copy ofyour message. By the time a consumer receives a message, it now only has one part: a payload. The labels attached to the message don’t get passed along with the payload when the message is routed. Messages may be published as persistent, which makes the AMQP broker persist them to disk. If the server is restarted the system ensures that received persistent messages are not lost.

  • Exchange is the initial destination for all published messages. Messages are published to exchanges, which are often compared to post offices or mailboxes.

  • Binding is a virtual connection between an exchange and a queue that enables messages to flow from the former to the latter.

  • Queue is the final destination for messages ready to be consumed. A single message can be copied and can reach multiple queues if the exchange’s routing rule says so. Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue.

  • Broker is a middleware application that can receive messages produced by publishers and deliver them to consumers or to another broker.

  • Virtual Host is a virtual division in a broker that allows the segregation of publishers, consumers, usually for security reasons.

  • Channel is a logical connection between a publisher/consumer and a broker. Multiple channels can be established within a single connection. Channels allow the isolation of the interaction between a particular client and broker so that they don’t interfere with each other. This happens without opening costly individual TCP connections.

  • Connection is a physical network (TCP) connection between a publisher/consumer and a broker. The connection only closes on client disconnection or in the case of a network or broker failure.

  • Consumers attach to a broker server and subscribe to a queue. Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.


Queues, exchanges and bindings are collectively referred to as AMQP entities.





Whenever you want to deliver a message to a queue, you do it by sending it to an exchange. Then, based on certain rules, RabbitMQ will decide to which queue it should deliver the message. Those rules are called routing keys. A queue is said to be bound to an exchange by a routing key. When you send a message to the broker, it’ll have a routing key—even a blank one—which RabbitMQ will try to match to the routing keys used in the bindings. If they match, then the message will be delivered to the queue. If the routing message doesn’t match any of binding patterns, it’ll be black-holed.


How does the broker handle delivery to multiple queues?


There are four different types of exchanges provided by the AMQP protocol:

  • direct is the default exchange type and is pretty simple: if the routing key matches, then the message is delivered to the corresponding queue. Direct exchanges are often used to distribute tasks between multiple workers (instances of the same application) in a round robin manner.

  • fanout will multicast the received message to the bound queues. The messaging pattern is simple: when you send a message to a fanout exchange, it’ll be delivered to all the queues attached to this exchange. This allows you to react in different ways based on only one message. Examples of usage scenarios: massively multi-player online (MMO) games can use it for leaderboard updates or sport news sites can use fanout exchanges for distributing score updates to mobile clients in near real-time.

  • topic allows messages to arrive to the same queue but coming from different sources. The topic exchange type is often used to implement various publish/subscribe pattern variations like stocks price updates or news updates that involve categorization or tagging.

  • headers allow you to match against a header in the AMQP message instead of the routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. They can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.


The GO RabbitMQ Client


Let’s get the client package:

go get github.com/streadway/amqp


Publishing a message is as easy as (for the sake of simplicity I’m not handling errors here, comments to the process in the code):

// Publisher.go
package main

import (
    "log"

    "github.com/streadway/amqp"
)

func main() {
    //Make a connection
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()

    //Ccreate a channel
    ch, _ := conn.Channel()
    defer ch.Close()

    //Declare a queue
    q, err := ch.QueueDeclare(
        "hello", // name of the queue
        false,   // should the message be persistent? also queue will survive if the cluster gets reset
        false,   // autodelete if there's no consumers (like queues that have anonymous names, often used with fanout exchange)
        false,   // exclusive means I should get an error if any other consumer subsribes to this queue
        false,   // no-wait means I don't want RabbitMQ to wait if there's a queue successfully setup
        nil,     // arguments for more advanced configuration
    )

    //Publish a message
    body := "hello world"
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    log.Printf("Message: %s", body)

}

If you run sudo rabbitmqctl list_queues you should see something like:

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello   1

Running publisher.go again will increase the number of messages in the queue:

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello   2

Now, it’s time to consume/receive the produced message. The code is pretty much the same, the same connection and channel, just a different method is executed on the channel (ch.Consume instead of ch.Publish) with different arguments. Redeclareing the queue is considered a good practice as we might start the consumer before the publisher, so we want to make sure the queue exists before we try to consume messages from it.

msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

Read the messages from a channel:

forever := make(chan bool)

go func() {
    for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
    }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

Once it run, you should see the all messages received/displayed and checking the status of the queue should show 0:

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello   0

The examples are taken from the RabbitMQ documentation, please see the references to the source at the very bottom.


Source:

[1] AMQP concepts

[2] RabbitMQ documentation with Go examples

[3] RabbitMQ in Action: Distributed Messaging for Everyone. Book by Jason J. W. Williams

[4] RabbitMQ Essentials. Book by David Dossot.