Microservices & RabbitMQ On Docker

April 26, 2018 0 Comments

Microservices & RabbitMQ On Docker

 

 


A microservices-based architecture involves decomposing your monolith app into multiple, totally independently deployable and scalable services. Beyond this base definition, what constitutes a microservice can be somewhat subjective, though there are several battle-tested practices adopted by giants like Netflix and Uber that should always be considered. And I'll discuss some of them. Ultimately, we want to divide our app into smaller apps, each of which is a system apart & deals with only one aspect of the whole app and does it really well. This decomposition is a very consequential step and can be done on the basis of subdomains, which have to be identified correctly. The smaller apps are more modular & manageable with well-defined boundaries, can be written using different languages/frameworks, fail in isolation so that the entire app doesn't go down (no SPOF). Take a Cinema ticketing example:



Source: https://codeburst.io/build-a-nodejs-cinema-api-gateway-and-deploying-it-to-docker-part-4-703c2b0dd269

Let's break down this bird's eye view:

i) The user app can be a mobile client, SPA etc or any client consuming our backend services.

ii) It's considered a bad practice to ask our clients to communicate with each of our services separately, for reasons I tried to explain here. This is what the API gateways are for: to receive client requests, call our service(s), return a response. Thus the client only has to talk to one server, giving the illusion of a monolith. Multiple gateways may be used for different kinds of clients (mobile apps, tablets, browsers etc.). They can and should be responsible for stuff like merging/joining responses from services, authentication, ACLs. In large applications, which need to scale and move dynamically, gateways also need access to a Service Registry which holds the locations of our microservice instances, databases etc.

iii) Each service has its own storage. This is a key point and ensures loose coupling. Some queries will then need to join data that is owned by multiple services. To avoid this major performance hit, data may be replicated and sharded. This design principle is not just tolerated in microservices, but encouraged.

iv) The REST calls made to our API gateway are passed to the services, which in turn talk to other services, return a result to the gateway which, perhaps, compiles it and responds with it to the client. Communication like this, among services, on one client request to the app should not happen. Otherwise we'll be sacrificing performance, on account of another HTTP round-trip, for the newly introduced modularity.

A single request should, ideally, only invoke one service to fetch the response. This means any synchronous requests between the services should be minimized, and that's not always possible; mechanisms like gRPC, Thrift or even simple HTTP (as in our example) are commonly employed when necessary. As you may have guessed, this implies that data will have to be replicated across our services. Say, the GET /catalog/<<cityId>> endpoint is also supposed to return the premieres at each cinema in the city at that time. With our new strategy, the premieres will have to be stored in the database for the Cinema Catalog service as well. Hence, point iii).

Communicating Asynchronously Between The Services

So, say, the premieres change as a result of some CRUD operation on the Movies service. To keep the data in sync, that update event will have to be emitted and applied to the Cinema Catalog service as well. Try to picture our microservices as a cluster of state machines where updates in states may have to be communicated across the cluster to achieve eventual consistency. Of course, we should never expect our end-users to have to wait longer for requests to finish and sacrifice their time for modularity to our benefit. Thereby, all of this communication has to be non-blocking. And that's where RabbitMQ comes in.

RabbitMQ is a very powerful message broker that implements the AMQP messaging protocol. Here's the abstract: first, you install a RabbitMQ server instance (broker) on a system. Then a publisher/producer program connects to this server and sends out a message. RabbitMQ queues that message and siphons it off to a single or multiple subscriber/consumer programs that are out there listening on the RabbitMQ server.

Before I get to the crux of this article, I want to explicitly declare that microservices are way more complex and we won't be covering critical topics like fault tolerance because of the intricacy of distributed systems, the full role of the API gateway, Service Discovery, data consistency patterns like Sagas, preventing service failure cascading using Circuit Breakers, health checks and architectural patterns like CQRS. Not to mention how to decide whether microservices will work for you or not.

How RabbitMQ Works

More specifically, the messages are published to an exchange inside the RabbitMQ broker. The exchange then distributes copies of that message to queues on the basis of certain developer-defined rules called bindings. This part of the messages' journey is called routing. And this indirection is of course what makes for the non-blocking message transmissions. Consumers listening on those queues that got the message will receive it. Pretty simple, right?

Not exactly. There are four different types of exchanges and each, along with the bindings, defines a routing algorithm. "Routing algorithm" means, essentially, how the messages are distributed among the queues. Going into the details about each type might be an overkill here, so I'll just expand on the one we'll be using: the topic exchange:

For an exchange to push a message onto a queue, that queue must be bound to the exchange. If we create an exchange ourselves, this would have to be done explicitly. However, when you deploy RabbitMQ it comes with a default, nameless exchange. And every queue we create will be automatically bound to this exchange. To be descriptive, I'll be creating a named exchange manually and then bind a queue to it. This binding is defined by a binding key. The exact way a binding key works, again, depends on the type of the exchange. Here's how it works with a topic exchange:


  • A queue is bound to an exchange using a string pattern (binding key)

  • The published message is delivered to the exchange along with a routing key

  • The exchange checks which queues match the routing key based on the binding key pattern defined before.

can substitute for exactly one word. # can substitute for zero or more words.



Source: https://www.rabbitmq.com/tutorials/tutorial-five-python.html

Any message with a routing key "quick.orange.rabbit" will be delivered to both queues. However, messages with "lazy.brown.fox" will only reach Q2. Those with a routing key not matching any pattern will be lost.

For some perspective, let's just skim over two other exchange types:



  • Fanout exchange: Messages sent to this kind of exchange will be sent to ALL the queues bound to it. The routing key, if provided, will be completely ignored. This can be used, for example, for broadcasting global configuration updates across a distributed system.


  • Direct exchange (simplest): Sends the message to the queue whose binding key is exactly equal to the given routing key. If there are multiple consumers listening on the queue, then the messages will be load-balanced among them, hence, it is commonly used to distribute tasks between multiple workers in a round robin manner.

My illustration will be very simple: a Python Flask app with a single POST endpoint, which, when called, will purport to update a user's info, emit a message to the RabbitMQ broker (non-blocking of course) and return a 201. A separate Go service will be listening for the message from the broker and hence have the chance to update its data accordingly. All three will be hosted on separate containers.

Setting Up Our Containerized Microservices & Broker Using Docker Compose

Provisioning a bunch of containers and all that goes along with them can be a pain, so I always rely on Docker Compose.

Here's the entire code. We're declaring three services that will be used for the three containers. The two volumes are needed for putting our code inside the containers:

# docker-compose.yml version: "3.2" 
services: rabbitmq-server: build: ./rabbitmq-server python-service: build: ./python-service # 'rabbitmq-server' will be available as a network reference inside this service # and this service will start only after the RabbitMQ service does. depends_on: - rabbitmq-server # Keep it running. tty: true # Map port 3000 on the host machine to port 3000 of the container. # This will be used to receive HTTP requests made to the service. ports: - "3000:3000" volumes: - './python-service:/python-service' go-service: build: ./go-service depends_on: - rabbitmq-server tty: true volumes: - './go-service:/go-service' # Host volumes used to store code.
volumes: python-service: go-service:

The Dockerfiles are pretty much the standard ones from Docker Hub, to which I've added:



  • /go-service working directory in the Go service container.


  • /python-service working directory in the Python service container.

  • Go's RabbitMQ client library called amqp

  • Python's RabbitMQ client Pika & Flask

Our Flask app has just one endpoint that receives a user_id and a full_name, which will be used to update the user's profile. A message indicating this update will then be sent to the RabbitMQ broker.

# main.py from flask import Flask 
from flask import request
from flask import jsonify
from services.user_event_handler import emit_user_profile_update app = Flask(name) @app.route('/users/<int:user_id>', methods=['POST'])
def update(user_id): new_name = request.form['full_name'] # Update the user in the datastore using a local transaction... emit_user_profile_update(user_id, {'full_name': new_name}) return jsonify({'full_name': new_name}), 201

The logic for emitting events to other services should always be separate from the rest of the app, so I've extracted it to a module. The exchange should be explicitly checked for and created by the publisher and consumer both, because we can't know (nor should we rely on) which service starts first. This is a good practice, which most RabbitMQ client libraries facilitate seamlessly:

# services/user_event_handler.py import pika 
import json def emit_user_profile_update(user_id, new_data): # 'rabbitmq-server' is the network reference we have to the broker, # thanks to Docker Compose. connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq-server')) channel = connection.channel() exchange_name = 'user_updates' routing_key = 'user.profile.update' # This will create the exchange if it doesn't already exist. channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True) new_data['id'] = user_id channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=json.dumps(new_data), # Delivery mode 2 makes the broker save the message to disk. # This will ensure that the message be restored on reboot even # if RabbitMQ crashes before having forwarded the message. properties=pika.BasicProperties( delivery_mode = 2, )) print("%r sent to exchange %r with data: %r" % (routing_key, exchange_name, new_data)) connection.close()

Don't get confused by channel. A channel is a virtual, lightweight connection within the TCP connection that is meant to prevent opening multiple, expensive TCP connections. Especially in multithreaded environments.

The durable parameter ensures that the exchange is persisted to the disk and can be restored if the broker crashes or goes offline for any reason. The publisher (Python service) creates an exchange named user_updates and sends it the user's updated data with user.profile.update as the routing key. This will be matched with a user.profile. binding key, which our Go service will define:

// main.go package main import ( "fmt" "log" "github.com/streadway/amqp" 
) func failOnError(err error, msg string) { if err != nil { // Exit the program. panic(fmt.Sprintf("%s: %s", msg, err)) }
} func main() { // 'rabbitmq-server' is the network reference we have to the broker, // thanks to Docker Compose. conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-server:5672/") failOnError(err, "Error connecting to the broker") // Make sure we close the connection whenever the program is about to exit. defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") // Make sure we close the channel whenever the program is about to exit. defer ch.Close() exchangeName := "userupdates" bindingKey := "user.profile.*" // Create the exchange if it doesn't already exist. err = ch.ExchangeDeclare( exchangeName, // name "topic", // type true, // durable false, false, false, nil, ) failOnError(err, "Error creating the exchange") // Create the queue if it doesn't already exist. // This does not need to be done in the publisher because the // queue is only relevant to the consumer, which subscribes to it. // Like the exchange, let's make it durable (saved to disk) too. q, err := ch.QueueDeclare( "", // name - empty means a random, unique name will be assigned true, // durable false, // delete when the last consumer unsubscribes false, false, nil, ) failOnError(err, "Error creating the queue") // Bind the queue to the exchange based on a string pattern (binding key). err = ch.QueueBind( q.Name, // queue name bindingKey, // binding key exchangeName, // exchange false, nil, ) failOnError(err, "Error binding the queue") // Subscribe to the queue. msgs, err := ch.Consume( q.Name, // queue "", // consumer id - empty means a random, unique id will be assigned false, // auto acknowledgement of message delivery false, false, false, nil, ) failOnError(err, "Failed to register as a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received message: %s", d.Body) // Update the user's data on the service's // associated datastore using a local transaction... // The 'false' indicates the success of a single delivery, 'true' would // mean that this delivery and all prior unacknowledged deliveries on this // channel will be acknowledged, which I find no reason for in this example. d.Ack(false) } }() fmt.Println("Service listening for events...") // Block until 'forever' receives a value, which will never happen. <-forever
}

RabbitMQ uses port 5672 by default for non-TLS connections and "guest" as the username & password. You can study the plethora of configuration options available and how to use them with Pika and Go amqp.

You might be wondering what this line is for: d.Ack(false)

This tells the broker that the message has been delivered, processed successfully and can be deleted. By default, these acknowledgments happen automatically. But we specified so otherwise when we subscribed to the queue: ch.Consume.

Now, if the Go service crashes (for any unforeseeable reason), the acknowledgment won't be sent and this will cause the broker to re-queue the message so that it may be given another chance to be processed.

Launching The Microservices

Alright, let's fire'em up:

Run docker-compose up

When the three services have been built (will take at least a few minutes the first time), check for their names using docker ps:

Open two new terminals, SSH into the Python and Go containers using the respective container names and start the servers:

docker exec -it microservicesusingrabbitmqpython-service1 bash

FLASK
APP=main.py python -m flask run --port 3000 --host 0.0.0.0

docker exec -it microservicesusingrabbitmqgo-service1 bash

go run main.go

Open a third terminal to send the POST request. I'll use Curl:

curl -d "fullname=usama" -X POST http://localhost:3000/users/1

And you'll see the transmission:

At any point, you may also SSH into the RabbitMQ container and just look around:



  • rabbitmqctl list
    exchanges (list all the exchanges on this broker node)


  • rabbitmqctl listqueues (list all the queues on this broker node)


  • rabbitmqctl list
    bindings (list all the bindings on this broker node)


  • rabbitmqctl listqueues name messagesready messages_unacknowledged (list all the queues with the number of messages each has that are ready to be delivered to clients but not yet delivered and those delivered but not yet acknowledged)

As I mentioned at the start, this is by no means a deep dive into microservices. There are many questions to be asked and I'll try to answer an important one: how do we make this communication transactional? So what happens if our Go service (consumer) throws an exception while updating the state on its end and we need to make sure that the update event rolls back across all the services that were affected by it?. Imagine how complicated this could get when we have several microservices and thousands of such "update events". Essentially, we'll need to incorporate separate events that perform the rollback.

In our case, if the Go service throws an exception while updating the data, it will have to send a message back to the Python service telling it to rollback the update. It's also important to note that in the case of these kinds of errors, the message delivery will have to be acknowledged (even though the processing wasn't successful), so that the message doesn't get re-queued by the broker. When writing our consumer, we'll have to decide which errors mean that the message should be re-queued (tried again) and which mean that the message should not be re-queued and just rolled back.

But how do we specify which update event to rollback and how exactly would the rollback happen? The Saga pattern along with event sourcing is widely used to ensure such data consistency.

A Few Words On Designing The Broker

Consider two things: the types of exchanges to use and how to group the exchanges.

If you need to broadcast certain kinds of messages to all the services in your system, have a look at the fanout exchange type. Then, one way to group the exchanges could be on the basis of events e.g. three fanout exchanges named user.profile.updated, user.profile.deleted, user.profile.added. This might not be what you want all the time since you might end up with too many exchanges and won’t be able to filter messages for specific consumers without creating a new exchange, queue and binding.

Another way could be to create topic exchanges in terms of entities in your system. So in our first example, user, movie, cinema etc could be entities and, say, queues bound to user could use binding keys like user.created (get message when a user is created), user.login (get message when a user has just logged in), user.roles.grant (get message telling that the user has been given an authorization role), user.notify (send the user a notification) etc.

Always use routing to filter and deliver messages to specific consumers. Writing code to discard certain incoming messages while accepting others is an anti-pattern. Consumers should only receive messages that they require.

Finally, if your needs are complex and you require that messages be filtered down to certain consumers on the basis of multiple properties, use a headers exchange.

Enjoy!


Tag cloud