Back to Nestjs

Mqtt

content/microservices/mqtt.md

latest9.5 KB
Original Source

MQTT

MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a scalable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks.

Installation

To start building MQTT-based microservices, first install the required package:

bash
$ npm i --save mqtt

Overview

To use the MQTT transporter, pass the following options object to the createMicroservice() method:

typescript
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
  },
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
  },
});

info Hint The Transport enum is imported from the @nestjs/microservices package.

Options

The options object is specific to the chosen transporter. The <strong>MQTT</strong> transporter exposes the properties described here.

Client

Like other microservice transporters, you have <a href="https://docs.nestjs.com/microservices/basics#client">several options</a> for creating a MQTT ClientProxy instance.

One method for creating an instance is to use use the ClientsModule. To create a client instance with the ClientsModule, import it and use the register() method to pass an options object with the same properties shown above in the createMicroservice() method, as well as a name property to be used as the injection token. Read more about ClientsModule <a href="https://docs.nestjs.com/microservices/basics#client">here</a>.

typescript
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
        }
      },
    ]),
  ]
  ...
})

Other options to create a client (either ClientProxyFactory or @Client()) can be used as well. You can read about them <a href="https://docs.nestjs.com/microservices/basics#client">here</a>.

Context

In more complex scenarios, you may need to access additional information about the incoming request. When using the MQTT transporter, you can access the MqttContext object.

typescript
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(`Topic: ${context.getTopic()}`);
}

info Hint @Payload(), @Ctx() and MqttContext are imported from the @nestjs/microservices package.

To access the original mqtt packet, use the getPacket() method of the MqttContext object, as follows:

typescript
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(context.getPacket());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(context.getPacket());
}

Wildcards

A subscription may be to an explicit topic, or it may include wildcards. Two wildcards are available, + and #. + is a single-level wildcard, while # is a multi-level wildcard which covers many topic levels.

typescript
@@filename()
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
@@switch
@Bind(Ctx())
@MessagePattern('sensors/+/temperature/+')
getTemperature(context) {
  console.log(`Topic: ${context.getTopic()}`);
}

Quality of Service (QoS)

Any subscription created with @MessagePattern or @EventPattern decorators will subscribe with QoS 0. If a higher QoS is required, it can be set globally using the subscribeOptions block when establishing the connection as follows:

typescript
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
    subscribeOptions: {
      qos: 2
    },
  },
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.MQTT,
  options: {
    url: 'mqtt://localhost:1883',
    subscribeOptions: {
      qos: 2
    },
  },
});

Per-pattern QoS

You can override the MQTT subscription QoS on a per-pattern basis by providing qos in the extras field of the pattern decorator. When not specified, the global subscribeOptions.qos is used as the default.

typescript
@@filename()
@EventPattern('critical-events', { extras: { qos: 2 } })
handleCriticalEvent(@Payload() data: any) {
  // This subscription uses QoS 2
}

@EventPattern('metrics', { extras: { qos: 0 } })
handleMetrics(@Payload() data: any) {
  // This subscription uses QoS 0
}
@@switch
@Bind(Payload())
@EventPattern('critical-events', { extras: { qos: 2 } })
handleCriticalEvent(data) {
  // This subscription uses QoS 2
}

@Bind(Payload())
@EventPattern('metrics', { extras: { qos: 0 } })
handleMetrics(data) {
  // This subscription uses QoS 0
}

info Hint Per-pattern QoS configuration does not affect existing behavior. When extras.qos is not specified, the subscription uses the global subscribeOptions.qos value.

Record builders

To configure message options (adjust the QoS level, set the Retain or DUP flags, or add additional properties to the payload), you can use the MqttRecordBuilder class. For example, to set QoS to 2 use the setQoS method, as follows:

typescript
const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
  .setProperties({ userProperties })
  .setQoS(1)
  .build();
client.send('replace-emoji', record).subscribe(...);

info Hint MqttRecordBuilder class is exported from the @nestjs/microservices package.

And you can read these options on the server-side as well, by accessing the MqttContext.

typescript
@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
  const { properties: { userProperties } } = context.getPacket();
  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
  const { properties: { userProperties } } = context.getPacket();
  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

In some cases you might want to configure user properties for multiple requests, you can pass these options to the ClientProxyFactory.

typescript
import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1833',
            userProperties: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}

Instance status updates

To get real-time updates on the connection and the state of the underlying driver instance, you can subscribe to the status stream. This stream provides status updates specific to the chosen driver. For the MQTT driver, the status stream emits connected, disconnected, reconnecting, and closed events.

typescript
this.client.status.subscribe((status: MqttStatus) => {
  console.log(status);
});

info Hint The MqttStatus type is imported from the @nestjs/microservices package.

Similarly, you can subscribe to the server's status stream to receive notifications about the server's status.

typescript
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: MqttStatus) => {
  console.log(status);
});

Listening to MQTT events

In some cases, you might want to listen to internal events emitted by the microservice. For example, you could listen for the error event to trigger additional operations when an error occurs. To do this, use the on() method, as shown below:

typescript
this.client.on('error', (err) => {
  console.error(err);
});

Similarly, you can listen to the server's internal events:

typescript
server.on<MqttEvents>('error', (err) => {
  console.error(err);
});

info Hint The MqttEvents type is imported from the @nestjs/microservices package.

Underlying driver access

For more advanced use cases, you may need to access the underlying driver instance. This can be useful for scenarios like manually closing the connection or using driver-specific methods. However, keep in mind that for most cases, you shouldn't need to access the driver directly.

To do so, you can use the unwrap() method, which returns the underlying driver instance. The generic type parameter should specify the type of driver instance you expect.

typescript
const mqttClient = this.client.unwrap<import('mqtt').MqttClient>();

Similarly, you can access the server's underlying driver instance:

typescript
const mqttClient = server.unwrap<import('mqtt').MqttClient>();