site/docs/advanced/cluster-client.md
In the previous Multi-Process Model chapter, we covered the multi-process model of the framework in detail, whose Agent process suits for a common class of scenarios: some middleware clients need to establish a persistent connection with server. In theory, a server had better establish only one persistent connection. However, the multi-process model will result in n times (n = number of Worker processes) connections being created.
+--------+ +--------+
| Client | | Client | ... n
+--------+ +--------+
| \ / |
| \ / | n * m links
| / \ |
| / \ |
+--------+ +--------+
| Server | | Server | ... m
+--------+ +--------+
In order to reuse persistent connections as much as possible (because they are very valuable resources for server), we put them into the Agent process to maintain, and then we transmit data to each Worker via messenger. It's feasible but we often need to write many codes to encapsulate the interface and realize data transmission, which is very troublesome.
In addition, it's relatively inefficient to transmit data via messenger, since messenger will do the transmission through the Master; In case IPC channel goes wrong, it would probably break Master process down.
So is there any better way? The answer is: YES! We provide a new type of model to reduce the complexity of this type of client encapsulation. The new Model bypasses the Master by establishing a direct socket between Agent and Worker. And as an external interface, Agent maintains shared connection among multiple Worker processes.
Under the new mode, the client's communication is as follows:
+-------+
| start |
+---+---+
|
+--------+---------+
__| port competition |__
win / +------------------+ \ lose
/ \
+---------------+ tcp conn +-------------------+
| Leader(Agent) |<---------------->| Follower(Worker1) |
+---------------+ +-------------------+
| \ tcp conn
| \
+--------+ +-------------------+
| Client | | Follower(Worker2) |
+--------+ +-------------------+
We abstract the client interface into the following two broad categories, which is also a specification of the client interface. For clients that are in line with norms, we can automatically wrap it as Leader / Follower mode.
subscribe(info, listener) interface contains two parameters. The first one is the information subscribed and the second one is callback function for subscribe.publish(info) interface contains a parameter which is the information subscribed.Client example
const { Base } = require('sdk-base');
class Client extends Base {
constructor(options) {
super(options); // remember to invoke ready after initialization is successful
this.ready(true);
}
/**
* Subscribe
*
* @param {Object} info - subscription information (a JSON object, try not to include attributes such as Function, Buffer, Date)
* @param {Function} listener - monitoring callback function, receives a parameter as the result of monitoring
*/
subscribe(info, listener) {
// ...
}
/**
* Publish
*
* @param {Object} info - publishing information, which is similar to that of subscribe described above
*/
publish(info) {
// ...
}
/**
* Get data (invoke)
*
* @param {String} id - id
* @return {Object} result
*/
async getData(id) {
// ...
}
}
Leader and Follower exchange data via the following protocols:
0 1 2 4 12
+-------+-------+---------------+---------------------------------------------------------------+
|version|req/res| reserved | request id |
+-------------------------------+-------------------------------+-------------------------------+
| timeout | connection object length | application object length |
+-------------------------------+---------------------------------------------------------------+
| conn object (JSON format) ... | app object |
+-----------------------------------------------------------+ |
| ... |
+-----------------------------------------------------------------------------------------------+
+----------+ +---------------+ +---------+
| Follower | | Local Server | | Leader |
+----------+ +---------------+ +---------+
| register channel | assign to |
+ -----------------------> | --------------------> |
| | |
| subscribe |
+ ------------------------------------------------> |
| publish |
+ ------------------------------------------------> |
| |
| subscribe result |
| <------------------------------------------------ +
| |
| invoke |
+ ------------------------------------------------> |
| invoke result |
| <------------------------------------------------ +
| |
In the following I will use a simple example to introduce how to make a client support Leader / Follower mode in the framework.
// registry_client.js
const { parse } = require('node:url');
const { Base } = require('sdk-base');
class RegistryClient extends Base {
constructor(options) {
super({
// Specify a method for asynchronous start
initMethod: 'init',
});
this._options = options;
this._registered = new Map();
}
/**
* Start logic
*/
async init() {
this.ready(true);
}
/**
* Get configuration
* @param {String} dataId - the dataId
* @return {Object} configuration
*/
async getConfig(dataId) {
return this._registered.get(dataId);
}
/**
* Subscribe
* @param {Object} reg
* - {String} dataId - the dataId
* @param {Function} listener - the listener
*/
subscribe(reg, listener) {
const key = reg.dataId;
this.on(key, listener);
const data = this._registered.get(key);
if (data) {
process.nextTick(() => listener(data));
}
}
/**
* publish
* @param {Object} reg
* - {String} dataId - the dataId
* - {String} publishData - the publish data
*/
publish(reg) {
const key = reg.dataId;
let changed = false;
if (this._registered.has(key)) {
const arr = this._registered.get(key);
if (arr.indexOf(reg.publishData) === -1) {
changed = true;
arr.push(reg.publishData);
}
} else {
changed = true;
this._registered.set(key, [reg.publishData]);
}
if (changed) {
this.emit(
key,
this._registered.get(key).map((url) => parse(url, true)),
);
}
}
}
module.exports = RegistryClient;
RegistryClient using the agent.cluster interface:// agent.js
const RegistryClient = require('registry_client');
module.exports = (agent) => {
// encapsulate and instantiate RegistryClient
agent.registryClient = agent
.cluster(RegistryClient) // parameter of create method is the parameter of RegistryClient constructor
.create({});
agent.beforeStart(async () => {
await agent.registryClient.ready();
agent.coreLogger.info('registry client is ready');
});
};
app.cluster interface to encapsulate RegistryClient:// app.js
const RegistryClient = require('registry_client');
module.exports = (app) => {
app.registryClient = app.cluster(RegistryClient).create({});
app.beforeStart(async () => {
await app.registryClient.ready();
app.coreLogger.info('registry client is ready');
// invoke subscribe to subscribe
app.registryClient.subscribe(
{
dataId: 'demo.DemoService',
},
(val) => {
// ...
},
);
// invoke publish to publsih data
app.registryClient.publish({
dataId: 'demo.DemoService',
publishData: 'xxx',
});
// invoke getConfig interface
const res = await app.registryClient.getConfig('demo.DemoService');
console.log(res);
});
};
Isn't it so simple?
Of course, if your client is not so 『standard』, then you may need to use some other APIs, for example, your subscription function is not named subscribe, but sub:
class MockClient extends Base {
constructor(options) {
super({
initMethod: 'init',
});
this._options = options;
this._registered = new Map();
}
async init() {
this.ready(true);
}
sub(info, listener) {
const key = reg.dataId;
this.on(key, listener);
const data = this._registered.get(key);
if (data) {
process.nextTick(() => listener(data));
}
}
...
}
You need to set it manually with the delegate API:
// agent.js
module.exports = (agent) => {
agent.mockClient = agent
.cluster(MockClient)
// delegate sub to logic of subscribe
.delegate('sub', 'subscribe')
.create();
agent.beforeStart(async () => {
await agent.mockClient.ready();
});
};
// app.js
module.exports = (app) => {
app.mockClient = app
.cluster(MockClient)
// delegate sub to subscribe logic
.delegate('sub', 'subscribe')
.create();
app.beforeStart(async () => {
await app.mockClient.ready();
app.sub({ id: 'test-id' }, (val) => {
// put your code here
});
});
};
We've already known that using cluster-client allows us to develop a 『pure』 RegistryClient without understanding the multi-process model. We can only focus on interacting with server, and use the cluster-client with a simple wrap to get a ClusterClient which supports multi-process model. The RegistryClient here is actually a DataClient that is specifically responsible for data communication with remote service.
You may have noticed that the ClusterClient brings with several constraints at the same time. If you want to expose the same approach to each process, RegistryClient can only support sub/pub mode and asynchronous API calls. Because all interactions in multi-process model must use socket communications, under which it is bound to bring this constraint.
Suppose we want to realize a synchronous get method. Put subscribed data directly into memory and use the get method to return data directly. How to achieve it? The real situation may be more complicated.
Here, we introduce an APIClient best practice. For modules that have requirements of synchronous API such as reading cached data, an APIClient is encapsulated base on RegistryClient to implement these APIs that are not related to interaction with the remote server. The APIClient instance is exposed to the user.
In APIClient internal implementation:
APIClient. Since ClusterClient's APIs have flushed multi-process differences, there is no need to concern about multi-process model when calls to RegistryClient during developing APIClient.For example, add a synchronous get method with buffer in the APIClient module:
// some-client/index.js
const cluster = require('cluster-client');
const RegistryClient = require('./registry_client');
class APIClient extends Base {
constructor(options) {
super(options); // options.cluster is used to pass app.cluster to Egg's plugin
this._client = (options.cluster || cluster)(RegistryClient).create(options);
this._client.ready(() => this.ready(true));
this._cache = {};
// subMap:
// {
// foo: reg1,
// bar: reg2,
// }
const subMap = options.subMap;
for (const key in subMap) {
this.subscribe(subMap[key], (value) => {
this._cache[key] = value;
});
}
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
// at last the module exposes this APIClient
module.exports = APIClient;
Then we can use this module like this:
// app.js || agent.js
const APIClient = require('some-client'); // the module above
module.exports = app => {
const config = app.config.apiClient;
app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
app.beforeStart(async () => {
await app.apiClient.ready();
});
};
// config.${env}.js
exports.apiClient = {
subMap: {
foo: {
id: '',
},
// bar...
}
};
To make it easy for you to encapsulate APIClient, we provide an APIClientBase base class in the cluster-client module. Then APIClient above can be rewritten as:
const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase {
// return the original client class
get DataClient() {
return RegistryClient;
} // used to set the cluster-client related parameters, equivalent to the second parameter of the cluster method
get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
in conclusion:
+------------------------------------------------+
| APIClient |
| +----------------------------------------|
| | ClusterClient |
| | +---------------------------------|
| | | RegistryClient |
+------------------------------------------------+
cluster-client module and is responsible for automatically flushing differences in multi-process model.ClusterClient to synchronize data, without the need to concern about multi-process model and is the final exposed module for users. APIs are exposed Through this, and support for synchronization and asynchronization.Students who are interested may have look at enhanced multi-process development model discussion process.
/**
* @property {Number} responseTimeout - response timeout, default is 60000
* @property {Transcode} [transcode]
* - {Function} encode - custom serialize method
* - {Function} decode - custom deserialize method
*/
config.clusterClient = {
responseTimeout: 60000,
};
| Configuration Items | Type | Default | Description |
|---|---|---|---|
| responseTimeout | number | 60000 (one minute) | Global interprocess communication timeout, you cannot set too short, because the proxy interface itself has a timeout setting |
| transcode | function | N/A | Serialization of interprocess communication, by default serialize-json (set up manually is not recommended) |
The above is about global configuration. If you want to do a separate setting for a client:
options in app/agent.cluster(ClientClass, options):app.registryClient = app
.cluster(RegistryClient, {
responseTimeout: 120 * 1000, // the parameters passing here are related to cluster-client
})
.create({
// here are parameters required by RegistryClient
});
getter attribute of clusterOptions in APIClientBase:const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase {
get DataClient() {
return RegistryClient;
}
get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}
// ...
}
module.exports = APIClient;