docs/ts/runtime/pubsub.mdx
Msg extends object
new Subscription<Msg>(
topic,
name,
cfg): Subscription<Msg>;
Topic<Msg>
string
SubscriptionConfig<Msg>
Subscription<Msg>
A topic is a resource to which you can publish messages to be delivered to subscribers of that topic.
Msg extends object
Publisher<Msg>new Topic<Msg>(name, cfg): Topic<Msg>
string
TopicConfig<Msg>
Topic<Msg>
TopicPerms.constructor
readonly cfg: TopicConfig<Msg>
readonly name: string
publish(msg): Promise<string>
Msg
Promise<string>
ref<P>(): P
P extends TopicPerms
P
Msg extends object
abstract publish(msg): Promise<string>
Msg
Promise<string>
RetryPolicy defines how a subscription should handle retries after errors either delivering the message or processing the message.
The values given to this structure are parsed at compile time, such that the correct Cloud resources can be provisioned to support the queue.
As such the values given here may be clamped to the supported values by the target cloud. (i.e. min/max values brought within the supported range by the target cloud).
optional maxBackoff?: DurationString
The maximum time to wait between retries. Defaults to 10 minutes.
optional maxRetries?: number
MaxRetries is used to control deadletter queuing logic, when: n == 0: A default value of 100 retries will be used n > 0: Encore will forward a message to a dead letter queue after n retries n == pubsub.InfiniteRetries: Messages will not be forwarded to the dead letter queue by the Encore framework
optional minBackoff?: DurationString
The minimum time to wait between retries. Defaults to 10 seconds.
SubscriptionConfig is used when creating a subscription
The values given here may be clamped to the supported values by the target cloud. (i.e. ack deadline may be brought within the supported range by the target cloud pubsub implementation).
Msg
optional ackDeadline?: DurationString
AckDeadline is the time a consumer has to process a message before it's returned to the subscription
Default is 30 seconds, however the ack deadline must be at least 1 second.
handler: (msg) => Promise<unknown>
Handler is the function which will be called to process a message sent on the topic.
When this function returns an error the message will be negatively acknowledged (nacked), which will cause a redelivery attempt to be made (unless the retry policy's MaxRetries has been reached).
Msg
Promise<unknown>
optional maxConcurrency?: number
MaxConcurrency is the maximum number of messages which will be processed simultaneously per instance of the service for this subscription.
Note that this is per instance of the service, so if your service has scaled to 10 instances and this is set to 10, then 100 messages could be processed simultaneously.
If the value is negative, then there will be no limit on the number of messages processed simultaneously.
Note: This is not supported by all cloud providers; specifically on GCP when using Cloud Run instances on an unordered topic the subscription will be configured as a Push Subscription and will have an adaptive concurrency See GCP Push Delivery Rate.
This setting also has no effect on Encore Cloud environments. If not set, it uses a reasonable default based on the cloud provider.
optional messageRetention?: DurationString
MessageRetention is how long an undelivered message is kept on the topic before it's purged.
Default is 7 days.
optional retryPolicy?: RetryPolicy
RetryPolicy defines how a message should be retried when the subscriber returns an error
TopicConfig is used when creating a Topic
Msg extends object
deliveryGuarantee: DeliveryGuarantee
DeliveryGuarantee is used to configure the delivery guarantee of a Topic
optional orderingAttribute?: keyof { [Key in string | number | symbol as Extract<Msg[Key], brandedAttribute<string> | brandedAttribute<number> | brandedAttribute<false> | brandedAttribute<true>> extends never ? never : Key]: never }
OrderingAttribute is the message attribute to use as a ordering key for messages and delivery will ensure that messages with the same value will be delivered in the order they where published.
If OrderingAttribute is not set, messages can be delivered in any order.
It is important to note, that in the case of an error being returned by a subscription handler, the message will be retried before any subsequent messages for that ordering key are delivered. This means depending on the retry configuration, a large backlog of messages for a given ordering key may build up. When using OrderingAttribute, it is recommended to use reason about your failure modes and set the retry configuration appropriately.
Once the maximum number of retries has been reached, the message will be forwarded to the dead letter queue, and the next message for that ordering key will be delivered.
To create attributes on a message, use the Attribute type:
type UserEvent = { user_id: Attribute<string>; action: string; }
const topic = new Topic<UserEvent>("user-events", { deliveryGuarantee: DeliveryGuarantee.AtLeastOnce, orderingAttribute: "user_id", // Messages with the same user-id will be delivered in the order they where published })
topic.publish(ctx, {user_id: "1", action: "login"}) // This message will be delivered before the logout topic.publish(ctx, {user_id: "2", action: "login"}) // This could be delivered at any time because it has a different user id topic.publish(ctx, {user_id: "1", action: "logout"}) // This message will be delivered after the first message
By using OrderingAttribute, the throughput will be limited depending on the cloud provider:
Note: OrderingAttribute currently has no effect during local development.
type Attribute<T> = T | brandedAttribute<T>
Attribute represents a field on a message that should be sent as an attribute in a PubSub message, rather than in the message body.
This is useful for ordering messages, or for filtering messages on a subscription - otherwise you should not use this.
To create attributes on a message, use the Attribute type:
type Message = {
user_id: Attribute<number>;
name: string;
};
const msg: Message = { user_id: 123, name: "John Doe", };
The union of brandedAttribute is simply used to help the TypeScript compiler understand that the type is an attribute and allow the AttributesOf type to extract the keys of said type.
T extends string | number | boolean
type AttributesOf<T> = keyof { [Key in keyof T as Extract<T[Key], allBrandedTypes> extends never ? never : Key]: never }
AttributesOf is a helper type to extract all keys from an object who's type is an Attribute type.
For example: type Message = { user_id: Attribute<number>; name: string; age: Attribute<number>; };
type MessageAttributes = AttributesOf<Message>; // "user_id" | "age"
T extends object
type DeliveryGuarantee = "at-least-once" | "exactly-once"
DeliveryGuarantee is used to configure the delivery contract for a topic.
<!-- symbol-end -->Re-exports DurationString
<!-- symbol-end -->