Federation with RabbitMQ

RabbitMQ is a feature rich message broker and it may be hard to be aware of all its capabilities. Nonetheless, it's beneficial to be aware of useful ones to avoid bad solutions and reinvention of the wheel. In this series about RabbitMQ, I'll describe only those features that I've used at least once in practice and I'd like to start with Federation plugin.

What is Federation plugin?

Federation plugin allows you to operate with multiple distributed RabbitMQ brokers in a way as if you had only one. It helps to copy messages between different brokers and does it in a fault tolerant way, properly handling connectivity issues and unavailable receivers.

Why would I use it?

One valid use case is when you have a group of companies, each of which has its own infrastructure and security policies, that need to have some integration done.

If you work in a big company, it's not uncommon for it to acquire other companies from time to time. Then, eventually the group may look like in the following diagram:

Group of companies diagram

A naive scenario may be: whenever we register a new user in parent company, we want to also onboard him in products of companies A and B.

In such a setup, Federation plugin can make transfer of messages between parent company and companies A and B transparent for applications on both sides. More on it later.

A bit of background

Before we move on to implementation, let's briefly go through some concepts in event driven architecture as it will help us to understand what federation setup we need to implement.

As per our naive scenario, a service in parent company owns users. Information about user domain  - events, is important for the business and we need to organise the flow in a proper way. There are two categories how services can communicate with each other or communicate state of its aggregates: orchestration and choreography.

By choreography approach, User service should broadcast user related facts (emit events) like user.created, user.updated, user.deleted:

Broadcast user facts diagram

As you can see, User service owns user exchange. What's also important is that in broadcast mode, User service should not know, nor care who receivers of those events are and if there are any at all. The scope of the service in the given context ends when it publishes events to user exchange and it should stay the same way, no matter what integrations with other services we need.

For example, if there is another service that is interested in user.created, user.updated and so on events, it can easily be plugged in without any changes required in User service:

Pluggable architecture

In this example, Finance service owns finance.user queue. User service has nothing to do with it and must not be aware of its existence. This way we can plug as many interested in parties as needed.

How can I do it?

What we need to achieve is exactly the same architecture as per previous example, the only difference will be that this time Finance (or any other) service will belong to another company in the group.

Set up Federation plugin

First of all, we need to have RabbitMQ with management and federation plugins up and running, so we could test our implementation step by step. For that, I've prepared a few files:

# Dockerfile
# base RabbitMQ image with a management plugin
FROM rabbitmq:management
# enable federation plugin
RUN rabbitmq-plugins enable rabbitmq_federation
# enable federation management plugin to be able to see information about configured federations later in UI
RUN rabbitmq-plugins enable rabbitmq_federation_management

And

# docker-compose.yaml
version: "3"

services:
  parent_company_rabbitmq:
    image: rabbitmq:management
    ports:
      - 8080:15672
  company_a_rabbitmq:
    image: rabbitmq-management-with-federation
    build:
      context: .
      dockerfile: ./Dockerfile
    ports:
      - 8081:15672

In docker-compose.yaml file, I defined two services, one represents parent company and the other, one of other companies in the group.

In order to start everything, now we just need to run:

docker-compose up

After that we should have both RabbitMQ instances up and running in the same network (just for simplicity of the example) and only company A (downstream) should have federation plugin enabled. They can be accessed via:

http://localhost:8080 - parent company

http://localhost:8081 - company A

Please use guest/guest to log in to the management panel.

Set up User service

Broadcast user facts diagram

As per the image above, the only required part for User service in the context of RabbitMQ is to create/own an exchange called user. Let's create it:

CLI:

docker-compose exec parent_company_rabbitmq \
rabbitmqadmin declare exchange name=user type=topic

UI:

Go to http://localhost:8080/#/exchanges and add a new exchange in Add a new exchange section:

add new user exchange 

Even with a federation setup, nothing changes for parent company and its User service, thus so far we keep the promise of transparently transfering messages to our child company A.

Set up Finance service

RabbitMQ has its own terminology in the context of federation setup, let's go though it to be aligned:

Upstream - it's a definition of a connection to a broker from where messages will be copied (source). In our case, it is an upstream created in company_a_rabbitmq that will point to parent_company_rabbitmq.

Upstream set - as the name suggests, it's a group of upstreams. For example, multiple different upstreams that point to parent company's RabbitMQ broker.

Policy - defines from what exchanges/queues in a chosen upstream to copy messages from.

Now comes the most interesting part, the actual work with Federation plugin. It can be done in at least two different ways, both of which have pros and cons. Let's consider both of them.

One to one naming

With this approach, if you have an exchange called user in the upstream, you'll have to call it the same way - user in the downstream.

First, we need to configure an upstream. It can be done via:

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqctl set_parameter federation-upstream parent-company-upstream \
'{"uri":"amqp://parent_company_rabbitmq:5672","expires":3600000}'

UI:

In order to add a new upstream via UI, go to http://localhost:8081/#/federation-upstreams and add a new upstream as per the following image:

add upstream

Here we specified that the name of a new upstream will be parent-company-upstream and it will point to parent_company_rabbitmq. It's also important to specify expires. In our case it's set to 3600000ms that is 1 hour. In case of any connectivity issues or if our downstream is down/slow, messages will be buffered on the upstream's side for 1 hour that makes this setup more reliable.

We want to copy all messages pubished to user exchange in parent-company-rabbitmq to be copied to company_a_rabbitmq. To do so, we need to add a policy:

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqctl set_policy --apply-to exchanges user-policy "^user$" \
'{"federation-upstream-set":"all"}'

UI:

Go to http://localhost:8081/#/policies and add a new policy:

add a new policy

By doing this, we created a policy named user-policy, that will match (pattern) user exchange (apply-to) from an upstream that we had created in advance.

Previously, when we created an upstream, we didn't assign it to any upstream sets, so it got implicitly assigned to a default one called all. That's why we specified federation-upstream-set as all.

The last missing bit is to actually create an exchange called user in our downstream and test the whole setup.

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqadmin declare exchange name=user type=topic

UI:

Go to http://localhost:8081/#/exchanges and add a new exchange in Add a new exchange section:

add new user exchange 

After that, there should be a link created that will show if everything was configured properly. We can check it via:

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqctl federation_status

UI:

Go to http://localhost:8081/#/federation

federation status

If setup is correct, state will be running. Now we just need to publish a message to user exchange in the upstream and double-check that it will arrive to the downstream.

Pros:

  • easy to set up. We need only one upstream for our parent company
  • we can also use a wildcard in our policy to synchronise messages from any desired exchanges (or create multiple policies)

Cons:

  • 1:1 naming. With current approach we have to call exchanges in the downstream the same way as they're called in the upstream and this is not always desired as it may not only lead to confusion, conflicts in names, but also go against established naming conventions in the downstream.

Use custom naming for local exchanges

To solve naming issues from the previously described approach, we can configure upstreams and policies in a different way. Instead of a generic upstream, we can create one that will check only needed exchange:

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqctl set_parameter federation-upstream parent-company-upstream \
'{"uri":"amqp://parent_company_rabbitmq:5672","expires":3600000, "exchange": "user"}'

UI:

Go to http://localhost:8081/#/federation-upstreams and add an upstream:

fixed upstream

Important part here is exchange that is user. It means that this upstream will synchronize only messages published to user exchange in the upstream.

Now we can create a policy with a different naming pattern for local exchanges.

CLI:

docker-compose exec company_a_rabbitmq \
rabbitmqctl set_policy --apply-to exchanges custom-named-user-policy "^parent-company.user$" \
'{"federation-upstream-set":"all"}'

UI:

Go to http://localhost:8081/#/policies and add a new policy:

precise policy example

By doing so, we've built the following relationship:

All messages from the exchange user in the upstream will be copied to parent-company.user exchange in the downstream. Like this, we now can use any custom names we like.

To test this federation, publish a message to user exchange in the upstream and check it in the downstream's parent-company.user exchange or bound to it queues.

Pros:

  • this approach supports custom naming

Cons:

  • requires setup of extra upstreams and policies

Conclusion

In this blog post, I've just briefly touched federation in RabbitMQ and in what scenarios it may be useful. I strongly recommend to read documentation to get deep understanding of the topic and cover other important parts, such as security that was omitted in these simplified examples.