docs/docs/cn/cluster-mode/development.md
在单节点环境中,插件通常可以通过进程内状态、事件或任务来完成需求;而在集群模式下,同一插件可能同时运行在多个实例上,面临以下典型问题:
NocoBase 核心在应用层预置了多种中间件接口,帮助插件在集群环境下复用统一能力。下面将结合源码介绍缓存、同步消息、消息队列与分布式锁的用法及最佳实践。
对于要保存在内存中的数据,建议使用系统内置的缓存组件进行管理。
app.cache 获取默认缓存实例。Cache 提供 set/get/del/reset 等基础操作,还支持 wrap 与 wrapWithCondition 封装缓存逻辑,以及 mset/mget/mdel 等批量方法。ttl,避免实例重启导致缓存丢失。// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
this.cache = await this.app.cacheManager.createCache({
name: 'auth',
prefix: 'auth',
store: 'redis',
});
await this.cache.wrap('token:config', async () => {
const repo = this.app.db.getRepository('tokenPolicies');
return repo.findOne({ filterByTk: 'default' });
}, 60 * 1000);
}
如果内存中的状态无法使用分布式缓存(如无法序列化),那么当状态随用户操作发生变化时,需要将变化通过同步信号通知到其他实例,以保持状态一致。
sendSyncMessage,内部调用 app.syncMessageManager.publish 并自动为通道加上应用级前缀,避免通道冲突。publish 可指定 transaction,消息会在数据库事务提交后再发送,保证状态与消息同步。handleSyncMessage 处理其他实例发来的消息,可在 beforeLoad 阶段订阅,对配置变更、Schema 同步等场景十分适用。示例:plugin-data-source-main 通过同步消息保持多节点 schema 一致
export class PluginDataSourceMainServer extends Plugin {
async handleSyncMessage(message) {
if (message.type === 'syncCollection') {
await this.app.db.getRepository('collections').load(message.collectionName);
}
}
private sendSchemaChange(data, options) {
this.sendSyncMessage(data, options); // 自动调用 app.syncMessageManager.publish
}
}
消息广播是同步信号的底层组件,也支持直接使用。当需要在实例间广播消息时,可通过该组件实现。
app.pubSubManager.subscribe(channel, handler, { debounce }) 可在实例间订阅通道;debounce 选项用于去抖动,避免重复广播导致的频繁回调。publish 支持 skipSelf(默认 true)与 onlySelf,用于控制消息是否回发到本实例。示例:plugin-async-task-manager 使用 PubSub 广播任务取消事件
const channel = `${plugin.name}.task.cancel`;
await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
this.logger.info(`Task ${id} cancelled on other node`);
await this.stopLocalTask(id);
});
await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });
消息队列用于调度异步任务,适合处理长耗时或可重试的操作。
app.eventQueue.subscribe(channel, { idle, process, concurrency }) 声明消费者,process 返回 Promise,可使用 AbortSignal.timeout 控制超时。publish 会自动补齐应用名前缀,并支持 timeout、maxRetries 等选项,默认适配内存队列,可根据需要切换到 RabbitMQ 等扩展适配器。示例:plugin-async-task-manager 使用 EventQueue 调度任务
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
concurrency: this.concurrency,
idle: this.idle,
process: async (payload, { signal }) => {
await this.runTask(payload.id, { signal });
},
});
await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });
在需要避免竞态操作时,可以使用分布式锁来序列化对资源的访问。
local 适配器,可注册 Redis 等分布式实现;通过 app.lockManager.runExclusive(key, fn, ttl) 或 acquire/tryAcquire 控制并发。ttl 用于兜底释放锁,防止异常情况下锁被永远持有。示例:plugin-data-source-main 使用分布式锁保护字段删除流程
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
await fieldModel.remove(options);
this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});
app.cache、app.syncMessageManager 等能力,避免在插件中重复实现跨节点通信逻辑。transaction.afterCommit(syncMessageManager.publish 已内置)以保证数据与消息一致。timeout、maxRetries、debounce,防止在异常情况下产生新的流量洪峰。通过以上能力,插件可以在不同实例间安全共享状态、同步配置、调度任务,满足集群部署场景下的稳定性与一致性要求。