server/README-MQTT.md
MQTT Implementation Overview
Revision 1.1
Authors: Ivan Kozlovic, Lev Brouk
NATS Server currently supports most of MQTT 3.1.1. This document describes how it is implemented.
It is strongly recommended to review the MQTT v3.1.1 specifications and get a detailed understanding before proceeding with this document.
In the MQTT specification there are concepts of Client and Server, used
somewhat interchangeably with those of Sender and Receiver. A Server
acts as a Receiver when it gets PUBLISH messages from a Sender
Client, and acts as a Sender when it delivers them to subscribed
Clients.
In the NATS server implementation there are also concepts (types) server and
client. client is an internal representation of a (connected) client and
runs its own read and write loops. Both of these have an mqtt field that if
set makes them behave as MQTT-compliant.
The code and comments may sometimes be confusing as they refer to server and
client sometimes ambiguously between MQTT and NATS.
When an MQTT client connects to a server, it must send a CONNECT packet to
create an MQTT Connection. The packet must include a Client Identifier.
The server will then create or load a previously saved Session for the (hash
of) the client ID.
The low level unit of transmission in MQTT is a Packet. Examples of packets
are: CONNECT, SUBSCRIBE, SUBACK, PUBLISH, PUBCOMP, etc.
An MQTT Message starts with a PUBLISH packet that a client sends to the
server. It is then matched against the current MQTT Subscriptions and is
delivered to them as appropriate. During the message delivery the server acts as
an MQTT client, and the receiver acts as an MQTT server.
Internally we use NATS Messages and NATS Subscriptions to facilitate
message delivery. This may be somewhat confusing as the code refers to msg and
sub. What may be even more confusing is that some MQTT packets (specifically,
PUBREL) are represented as NATS messages, and that the original MQTT packet
"metadata" may be encoded as NATS message headers.
MQTT specifies 3 levels of quality of service (QoS):
0 for at most once. A single delivery attempt.1 for at least once. Will try to redeliver until acknowledged by the
receiver.2 for exactly once. See the [SPEC REF] for the acknowledgement flow.QoS 1 and 2 messages need to be identified with publish identifiers (PIs). A PI is a 16-bit integer that must uniquely identify a message for the duration of the required exchange of acknowledgment packets.
Note that the QoS applies separately to the transmission of a message from a sender client to the server, and from the server to the receiving client. There is no protocol-level acknowledgements between the receiver and the original sender. The sender passes the ownership of messages to the server, and the server then delivers them at maximum possible QoS to the receivers (subscribers). The PIs for in-flight outgoing messages are issued and stored per session.
A Retained Message is not part of any MQTT session and is not removed when the session that produced it goes away. Instead, the server needs to persist a single retained message per topic. When a subscription is started, the server needs to send the “matching” retained messages, that is, messages that would have been delivered to the new subscription should that subscription had been running prior to the publication of this message.
Retained messages are removed when the server receives a retained message with an empty body. Still, this retained message that serves as a “delete” of a retained message will be processed as a normal published message.
Retained messages can have QoS.
The CONNECT packet can contain information about a Will Message that needs to
be sent to any client subscribing on the Will topic/subject in the event that
the client is disconnected implicitly, that is, not as a result as the client
sending the DISCONNECT packet.
Will messages can have the retain flag and QoS.
The MQTT implementation relies heavily on JetStream. We use it to:
PUBREL packets.Here is the overview of how we set up and use JetStream streams, consumers, and internal NATS subscriptions.
All interactions with JetStream are performed via mqttJSA that sends NATS
requests to JetStream. Most are processed synchronously and await a response,
some (e.g. jsa.sendAck()) are sent asynchronously. JetStream API is usually
referred to as jsa in the code. No special locking is required to use jsa,
however the asynchronous use of JetStream may create race conditions with
delivery callbacks.
We create the following streams unless they already exist. Failing to ensure the streams would prevent the client from connecting.
Each stream is created with a replica value that is determined by the size of the cluster but limited to 3. It can also be overwritten by the stream_replicas option in the MQTT configuration block.
The streams are created the first time an Account Session Manager is initialized
and are used by all sessions in it. Note that to avoid race conditions, some
subscriptions are created first. The streams are never deleted. See
mqttCreateAccountSessionManager() for details.
$MQTT_sess stores persisted Session records. It filters on
"$MQTT.sess.> subject and has a “limits” policy with MaxMsgsPer setting
of 1.$MQTT_msgs is used for QoS 1 and 2 message delivery.
It filters on $MQTT.msgs.> subject and has an “interest” policy.$MQTT_rmsgs stores Retained Messages. They are all
stored (and filtered) on a single subject $MQTT.rmsg. This stream has a
limits policy.$MQTT_qos2in stores and deduplicates Incoming QoS 2 Messages. It
filters on $MQTT.qos2.in.> and has a "limits" policy with MaxMsgsPer of
1.$MQTT_out stores Outgoing QoS 2 PUBREL packets. It filters on
$MQTT.out.> and has a "interest" retention policy.$MQTT_rmsgs_<server name hash>$MQTT.sub.<nuid> for the messages themselves, and one to receive replies to
"delete retained message" JS API (on the JS reply subject var).When a new QoS 2 MQTT subscription is detected in a session, we ensure that
there is a durable consumer for QoS
2 PUBRELs out for delivery -
$MQTT_PUBREL_<session id hash>
For all MQTT subscriptions, regardless of their QoS, we create internal NATS subscriptions to
subject (directly encoded from topic). This subscription is used to
deliver QoS 0 messages, and messages originating from NATS.subject fwc complements subject for topics like topic.# to
include topic itself, see top-level wildcardsFor QoS 1 or 2 MQTT subscriptions we ensure:
<session ID hash>_<nuid>$MQTT.sub.<nuid> to deliver the messages to the
receiving client.As indicated before, for a QoS1 or QoS2 subscription, the server will create a JetStream consumer with the appropriate subject filter. If the subscription already existed, then only the NATS subscription is created for the JetStream consumer’s delivery subject.
Note that JS consumers can be created with an “Replicas” override, which from recent discussion is problematic with “Interest” policy streams, which “$MQTT_msgs” is.
We do handle situations where a subscription on the same subject filter is sent with a different QoS as per MQTT specifications. If the existing was on QoS 1 or 2, and the “new” is for QoS 0, then we delete the existing JS consumer.
Subscriptions that are QoS 0 have a NATS subscription with the callback function
being mqttDeliverMsgCbQos0(); while QoS 1 and 2 have a NATS subscription with
callback mqttDeliverMsgCbQos12(). Both those functions have comments that
describe the reason for their existence and what they are doing. For instance
the mqttDeliverMsgCbQos0() callback will reject any producing client that is
of type JETSTREAM, so that it handles only non JetStream (QoS 1 and 2) messages.
Both these functions end-up calling mqttDeliver() which will first enqueue the possible retained messages buffer before delivering any new message. The message itself being delivered is serialized in MQTT format and enqueued to the client’s outbound buffer and call to addToPCD is made so that it is flushed out of the readloop.
An MQTT connection is created when a listening MQTT server receives a CONNECT
packet. See mqttProcessConnect(). A connection is associated with a session.
Steps:
AccountSessionManager so we can have an
mqttSession. Lazily initialize JetStream streams, and internal consumers
and subscriptions. See getOrCreateMQTTAccountSessionManager().mqttProcessConnect().mqttSession - create a new or load a previously persisted
one. If the clean flag is set in CONNECT, clean the session. see
mqttSession.clear()CONNACK packet. If there were errors in previous steps,
include the error.An MQTT connection can be closed for a number of reasons, including receiving a
DISCONNECT from the client, explicit internal errors processing MQTT packets,
or the server receiving another CONNECT packet with the same client ID. See
mqttHandleClosedClient() and mqttHandleWill(). Steps:
DISCONNECT packet)On an explicit disconnect, that is, the client sends the DISCONNECT packet, the server will NOT send the Will, as per specifications.
For sessions that had the “clean” flag, the JS consumers corresponding to QoS 1 subscriptions are deleted through JS API calls, the session record is then deleted (based on recorded stream sequence) from the “$MQTT_sess” stream.
Finally, the client connection is closed
Sessions are persisted on disconnect, and on subscriptions changes.
Receiving an MQTT SUBSCRIBE packet creates new subscriptions, or updates
existing subscriptions in a session. Each SUBSCRIBE packet may contain several
specific subscriptions (topic + QoS in each). We always respond with a
SUBACK, which may indicate which subscriptions errored out.
For each subscription in the packet, we:
topic starts with $MQTT.sub..topic.topic, once as QoS 0.topic, update its QoSWhen a session is restored (no clean flag), we go through the same steps to re-subscribe to its stored subscription, except step #8 which would have been redundant.
When we get an UNSUBSCRIBE packet, it can contain multiple subscriptions to
unsubscribe. The parsing will generate a slice of mqttFilter objects that
contain the “filter” (the topic with possibly wildcard of the subscription) and
the QoS value. The server goes through the list and deletes the JS consumer (if
QoS 1 or 2) and unsubscribes the NATS subscription for the delivery subject (if
it was a QoS 1 or 2) or on the actual topic/subject. In case of the “#”
wildcard, the server will handle the “level up” subscriptions that NATS had to
create.
Again, we update the session and persist it as needed in the $MQTT_sess
stream.
Detect an incoming PUBLISH packet, parse and check the message QoS. Fill out
the session's mqttPublish struct that contains information about the
published message. (see mqttParse(), mqttParsePub())
Process the message according to its QoS (see mqttProcessPub())
PUBACK$MQTT_qos2in stream, using a PI-specific subject.
Since MaxMsgsPer is set to 1, we will ignore duplicates on the PI.PUBRECPUBREL, then initiate message deliveryPUBCOMPInitiate message delivery (see mqttInitiateMsgDelivery())
topic into a NATS subject using
mqttTopicToNATSPubSubject() function. If there is a known subject
mapping, then we select the new subject using selectMappedSubject()
function and then convert back this subject into an MQTT topic using
natsSubjectToMQTTTopic() function.PUBLISH packet received as a NATS message. Use NATS
headers for the metadata, and the deliverable MQTT PUBLISH packet as the
contents.subject (and subject fwc if applicable, see
subject wildcards). Use the "standard" NATS
c.processInboundClientMsg() to do that. processInboundClientMsg() will
distribute the message to any NATS subscriptions (including routes,
gateways, leafnodes) and the relevant MQTT subscriptions.c.processInboundClientMsg() calling c.mqttHandlePubRetain() For MQTT
clients.$MQTT_msgs stream as
$MQTT.msgs.<subject> for "at least once" delivery with retries.Let NATS and JetStream deliver to the internal subscriptions, and to the
receiving clients. See mqttDeliverMsgCb...()
The NATS message posted to subject (and subject fwc) will be delivered
to each relevant internal subscription by calling mqttDeliverMsgCbQoS0().
The function has access to both the publishing and the receiving clients.
...QoS12 callback. Note that if the original message was
publuished with a QoS 1 or 2, but the subscription has its maximum QoS
set to 0, the message will be delivered by this callback.topic from the NATS subject.PUBLISH packet.The NATS message posted to JetStream as $MQTT.msgs.subject will be
consumed by subscription-specific consumers. Note that MQTT subscriptions
with max QoS 0 do not have JetStream consumers. They are handled by the
QoS0 callback.
The consumers will deliver it to the $MQTT.sub.<nuid>
subject for their respective NATS subscriptions by calling
mqttDeliverMsgCbQoS12(). This callback too has access to both the
publishing and the receiving clients.
sess.cpending
for the JS reply subject. If so, use the existing PI and treat this as a
duplicate redelivery.trackPublish() and
bumpPI()) and store it in sess.cpending and sess.pendingPublish,
along with the JS reply subject that can be used to remove this pending
message from the consumer once it's delivered to the receipient.topic from the NATS subject.PUBLISH packet.QoS 1: "Wait" for a PUBACK. See mqttProcessPubAck().
QoS 2: "Wait" for a PUBREC. When received, we need to do all the same
things as in the QoS 1 PUBACK case, but we need to send out a PUBREL, and
continue using the same PI until the delivery flow is complete and we get
back a PUBCOMP. For that, we add the PI to sess.pendingPubRel, and to
sess.cpending with the PubRel consumer durable name.
We also compose and store a headers-only NATS message signifying a PUBREL
out for delivery, and store it in the $MQTT_qos2out stream, as
$MQTT.qos2.out.<session-id>.
QoS 2: Deliver PUBREL. The PubRel session-specific consumer will publish to
internal subscription on $MQTT.qos2.delivery, calling
mqttDeliverPubRelCb(). We store the ACK reply subject in cpending to
remove the JS message on PUBCOMP, compose and send out a PUBREL packet.
QoS 2: "Wait" for a PUBCOMP. See mqttProcessPubComp().
PUBREL message.When we process an inbound PUBLISH and submit it to
processInboundClientMsg() function, for MQTT clients it will invoke
mqttHandlePubRetain() which checks if the published message is “retained” or
not.
If it is, then we construct a record representing the retained message and store
it in the $MQTT_rmsg stream, under the single $MQTT.rmsg subject. The stored
record (in JSON) contains information about the subject, topic, MQTT flags, user
that produced this message and the message content itself. It is stored and the
stream sequence is remembered in the memory structure that contains retained
messages.
Note that when creating an account session manager, the retained messages stream is read from scratch to load all the messages through the use of a JS consumer. The associated subscription will process the recovered retained messages or any new that comes from the network.
A retained message is added to a map and a subscription is created and inserted into a sublist that will be used to perform a ReverseMatch() when a subscription is started and we want to find all retained messages that the subscription would have received if it had been running prior to the message being published.
If a retained message on topic “foo” already exists, then the server has to delete the old message at the stream sequence we saved when storing it.
This could have been done with having retained messages stored under
$MQTT.rmsg.<subject> as opposed to all under a single subject, and make use of
the MaxMsgsPer field set to 1. The MaxMsgsPer option was introduced well into
the availability of MQTT and changes to the sessions was made in PR
#2501, with a conversion of
existing streams such as $MQTT*sess*<sess ID> into a single stream with unique
subjects, but the changes were not made to the retained messages stream.
There are also subscriptions for the handling of retained messages which are messages that are asked by the publisher to be retained by the MQTT server to be delivered to matching subscriptions when they start. There is a single message per topic. Retained messages are deleted when the user sends a retained message (there is a flag in the PUBLISH protocol) on a given topic with an empty body. The difficulty with retained messages is to handle them in a cluster since all servers need to be aware of their presence so that they can deliver them to subscriptions that those servers may become the leader for.
$MQTT_rmsgs which has a “limits” policy and holds retained messages, all
under $MQTT.rmsg single subject. Not sure why I did not use MaxMsgsPer for
this stream and not filter $MQTT.rmsg.>.The first step when processing a new subscription is to gather the retained
messages that would be a match for this subscription. To do so, the server will
serialize into a buffer all messages for the account session manager’s sublist’s
ReverseMatch result. We use the returned subscriptions’ subject to find from a
map appropriate retained message (see serializeRetainedMsgsForSub() for
details).
The MQTT accept loop is started when the server detects that an MQTT port has
been defined in the configuration file. It works similarly to all other accept
loops. Note that for MQTT over websocket, the websocket port has to be defined
and MQTT clients will connect to that port instead of the MQTT port and need to
provide /mqtt as part of the URL to redirect the creation of the client to an
MQTT client (with websocket support) instead of a regular NATS with websocket.
See the branching done in startWebsocketServer(). See startMQTT().
When a TCP connection is accepted, the internal go routine will invoke
createMQTTClient(). This function will set a c.mqtt object that will make it
become an MQTT client (through the isMqtt() helper function). The readLoop()
and writeLoop() are started similarly to other clients. However, the read loop
will branch out to mqttParse() instead when detecting that this is an MQTT
client.
mqttAccountSessionManager is an object that holds the state of all sessions in
an account. It also manages the lifecycle of JetStream streams and internal
subscriptions for processing JS API replies, session updates, etc. See
mqttCreateAccountSessionManager(). It is lazily initialized upon the first
MQTT CONNECT packet received. Account session manager is referred to as asm
in the code.
Note that creating the account session manager (and attempting to create the streams) is done only once per account on a given server, since once created the account session manager for a given account would be found in the sessions map of the mqttSessionManager object.
Once all that is done, we now go to the creation of the session object itself. For that, we first need to make sure that it does not already exist, meaning that it is registered on the server - or anywhere in the cluster. Note that MQTT dictates that if a session with the same ID connects, the OLD session needs to be closed, not the new one being created. NATS Server complies with this requirement.
Once a session is detected to already exists, the old one (as described above) is closed and the new one accepted, however, the session ID is maintained in a flappers map so that we detect situations where sessions with the same ID are started multiple times causing the previous one to be closed. When that detection occurs, the newly created session is put in “jail” for a second to avoid a very rapid succession of connect/disconnect. This has already been seen by users since there was some issue there where we would schedule the connection closed instead of waiting in place which was causing a panic.
We also protect from multiple clients on a given server trying to connect with the same ID at the “same time” while the processing of a CONNECT of a session is not yet finished. This is done with the use of a sessLocked map, keyed by the session ID.
If everything is good up to that point, the server will either create or restore
a session from the stream. This is done in the createOrRestoreSession()
function. The client/session ID is hashed and added to the session’s stream
subject along with the JS domain to prevent clients connecting from different
domains to “pollute” the session stream of a given domain.
Since each session constitutes a subject and the stream has a maximum of 1 message per subject, we attempt to load the last message on the formed subject. If we don’t find it, then the session object is created “empty”, while if we find a record, we create the session object based on the record persisted on the stream.
If the session was restored from the JS stream, we keep track of the stream
sequence where the record was located. When we save the session (even if it
already exists) we will use this sequence number to set the
JSExpectedLastSubjSeq header so that we handle possibly different servers in a
(super)cluster to detect the race of clients trying to use the same session ID,
since only one of the write should succeed. On success, the session’s new
sequence is remembered by the server that did the write.
When created or restored, the CONNACK can now be sent back to the client, and if there were any recovered subscriptions, they are now processed.
When the server delivers a message with QoS 1 or 2 (also a PUBREL for QoS 2) to a subscribed client, the client will send back an acknowledgement. See mqttProcessPubAck(), mqttProcessPubRec(), and mqttProcessPubComp()
While the specific logic for each packet differs, these handlers all update the
session's PI mappings (cpending, pendingPublish, pendingPubRel), and if
needed send an ACK to JetStream to remove the message from its consumer and stop
the re-delivery attempts.
Note that MQTT subscriptions have wildcards too, the “+” wildcard is equivalent
to NATS’s “*” wildcard, however, MQTT’s wildcard “#” is similar to “>”, except
that it also includes the level above. That is, a subscription on “foo/#” would
receive messages on “foo/bar/baz”, but also on “foo”.
So, for MQTT subscriptions enging with a '#' we are forced to create 2
internal NATS subscriptions, one on “foo” and one on “foo.>”.
$MQTT_msgs with $MQTT_out.$MQTT.rmsg.> and MaxMsgsPer for retained messages.