Message Queue

Message Queue

In order to allow microservices to communicate with each other in an asynchronous manner, SMS allows you to use a message queue to send messages between microservices.

The default message queue is RabbitMQ, but with a bit of work you can use any message queue you want.

Set up

You need to set up the message queue before using it. For that connect to the message queue host on port 15672 (default port) and do the following steps:

  • In the queue part create a new queue (in the examples we will call it my_queue).
  • In the exchange part create a new exchange (in the examples we will call it my_exchange).
  • In the exchange part create a new binding between the queue and the exchange (in the examples we will bind my_queue to my_exchange).

Environment variables

You will need to set up the following environment variables to be able to connect to the message queue:

export RABBITMQ_HOST=localhost;
export RABBITMQ_PORT=5672;
export RABBITMQ_USER=guest;
export RABBITMQ_PASSWORD=guest;

Producer

You can send messages to the message queue using the following method:

object ExampleProducer{
 
  def produce(channel: Channel): ZIO[Any, Throwable, Unit] =
    ZIO.scoped {
      for {
        channel <- ZIO.succeed(channel)
        producer: ZIO[Any, Throwable, Long] =
          Random.nextUUID
            .flatMap(uuid =>
              channel
                .publish(ExchangeName("my_exchange"), uuid.toString.getBytes)
                .unit
            )
            .schedule(Schedule.spaced(1.seconds))
        p <- producer
      } yield ()
    }
}

Here we create an infinite stream of uuid and send one every second to the message queue.

Please note that the "my_exchange" can be replaced by any exchange you want to use.

Listener

In order to ease your use of the message queue, SMS provides a Listener trait that you can use to listen to messages from the queue.

Here is an example of how to use it:

object ExampleListener extends Listener {
 
  val queueName = "my_queue"
  val consumerName = "my_consumer"
 
  val callback = (record, channel) => {
    val deliveryTag = record.getEnvelope.getDeliveryTag
    Console.printLine(
      s"Received $deliveryTag: ${new String(record.getBody)}"
    ) *>
      channel.ack(DeliveryTag(deliveryTag))
  }
}

Parameters

The queueName is the queue your consumer will listen to. The consumer name is the name of your consumer. It is used to identify your consumer in the RabbitMQ management interface so it's no very useful.

Callback

The callback is a function that will be called every time a message is received from the queue.

It takes two parameters:

  • The record that was received, which contains the message body and some metadata.
  • The channel, which is used to acknowledge the message.