pip/pip-290.md
Web Socket Proxy Server provides a simple way to interact with Pulsar under WSS protocol.
2.11, there is a feature that provides a way to set encrypt keys for the internal producers and consumers of Web Socket Proxy Server, but needs the user to upload both public key and private key into the Web Socket Proxy Server(in other words: user should expose the keys to Web Socket Proxy Server), there is a un-recommended workaround for this shortcoming<sup>[1]</sup>. The benefit is that the WSS producer and WSS consumer should not care about encryption and decryption.The Construction of the Encrypt Context:
{
"batchSize": 2, // How many single messages are in the batch. If null, it means it is not a batched message.
"compressionType": "NONE", // the compression type.
"uncompressedMessageSize": 0, // the size of the uncompressed payload.
"keys": {
"client-rsa.pem": { // key name.
"keyValue": "asdvfdw==", // key value.
"metadata": {} // extra props of the key.
}
},
"param": "Tfu1PxVm6S9D3+Hk" // the IV of current encryption for this message.
}
All the fields of Encrypt Context are used to parse the encrypted message payload.
keys and param are used to decrypt the encrypted message payload.compressionType and uncompressedMessageSize are used to uncompress the compressed message payload.batchSize is used to extract the batched message payload.There is another attribute named encryptionAlgo used to identify what encrypt algo is using, it is an optional attribute, so there is no such property in Encrypt Context.
When the internal consumer of the Web Socket Proxy Server receives a message, if the message metadata indicates that the message is encrypted, the consumer will add Encrypt Context into the response for the WSS consumer.
CryptoKeyReader: an interface that requires users to implement to read public key and private key.MessageCrypto: a tool interface to encrypt and decrypt the message payload and add and extract encryption information for message metadata.Therefore, there is no way to enable encryption under the WSS protocol and meet the following conditions:
Provide a way to make Web Socket Proxy Server just passes encrypt information to the client, the WSS producer and WSS consumer did encrypt and decrypt themselves.
Since the order of producer operation for message payloads is compression --> encryption, users need to handle Compression themselves if needed.
If other clients(such as Java, CPP) are sending messages to the topic that the WSS consumer was subscribed to, it is possible that there are some batched messages in the topic, then the WSS consumer will inevitably receive the batched messages. Since the order of consumer operation for message payload is deencryption --> un-compression --> extract the batched messages, users need to handle Un-compression and Extract Batch Messages themselves.
This proposal does not intend to support the three features:
For WSS producers:
Modify the definition of parameter encryptionKeys to make it can set in two ways:
encryptionKeys, then Web Socket Proxy Server will still work in the original way, which is defined in the PIP Support encryption in Web Socket Proxy ServerencryptionKeys, and the encryptionKeys[{key_name}].keyValue is not empty, Web Socket Proxy Server will mark this Producer as Client-Side Encryption Producer, then discard server-side batch messages, server-side compression, and server-side encryption. The constructor of encryptionKeys is like below:{
"client-ecdsa.pem": {
"keyValue": "BDJfN+Iw==",
"metadata": {
"k1": "v1"
}
}
}
For WSS consumers: Users can set the parameter cryptoFailureAction to CONSUME to directly receive the undecrypted message payload (it was supported before).
For the producers marked as Client-Side Encryption Producers:
CryptoKeyReader to DummyCryptoKeyReaderImpl.
DummyCryptoKeyReaderImpl: doesn't provide any public key or private key, and just returns null.MessageCrypto to WSSDummyMessageCryptoImpl to skip the message Server-Side encryption.
WSSDummyMessageCryptoImpl: only set the encryption info into the message metadata and discard payload encryption.enableBatching to false to skip Server-Side batch messages building, and print a log if the discarded parameters enableBatching, batchingMaxMessages, maxPendingMessages, batchingMaxPublishDelay were set.CompressionType to None to skip the Server-Side compression, and print a log if the discarded parameter compressionType was set.enableChunking to false(the default value is false) to prevent unexpected problems if the default setting is changed in the future.For the client-side encryption consumers:
cryptoFailureAction of the consumer is CONSUME, just print an DEBUG level log when receiving an encrypted message if the consumer could not decrypt it(the original log level is WARN).Define a new mode for the parameter encryptionKeys:
| param name | description | constructor (before encode) |
|---|---|---|
encryptionKeys | Base64 encoded and URL encoded and JSON formatted encryption keys | Map<String, EncryptionKey> |
Add JSON attributes below:
| param name | description | constructor (before encode) |
|---|---|---|
compressionType | Compression type. Do not set it if compression is not performed | CompressionType |
uncompressedMessageSize | The size of the payload before compression. Do not set it if compression is not performed | int |
encryptionParam | Base64 encoded serialized initialization vector used when the client encrypts | byte[] |
public void connect() {
String protocolAndHostPort = "ws://localhost:55217";
String topicName = "perssitent://public/default/tp1";
String keys = ```
{
"client-ecdsa.pem": {
"keyValue": "BDJf/72DhLRs0C0/U+vkykeIBfXaaJiwpqPVgWJvV7B7GwqIMvY6OFXdFvi0gx7Co/0xO7vKTHLQP8GZAt8DWrsCb8W1jhxmOjpThHBaksXG0kN+Iw==",
"metadata": {
"k1": "v1"
}
}
}
```
StringBuilder producerUrL = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/producer/persistent/")
.append(topicName)
.append("?")
.append("encryptionKeys=").append(base64AndURLEncode(keys));
WebSocketClient wssClient = new WebSocketClient();
wssClient.start();
Session session = wssClient.connect(this, producerUrL, new ClientUpgradeRequest()).get();
}
public void sendMessage() {
byte[] payload = "msg-123".getBytes(UTF-8); // [109, 115, 103, 45, 49, 50, 51]
String msgKey = "client-ecdsa.pem";
// Compression if needed(optional).
CompressionType compressionType = CompressionType.LZ4;
msg.uncompressedMessageSize = 5;
byte[] compressedPayload = compress(payload); // [109, 115, 103, 45, 49, 50, 51]
// Encrypt if needed.
bytes[] encryptionParam = getEncryptionParam(); // [-10, -5, -124, 23, 14, -122, 30, 127, 64, 63, 85, -79]
String base64EncodedEncryptionParam = base64Encode(encryptionParam); // 9vuEFw6GHn9AP1Wx
bytes[] encryptedPayload = encrypt(compressedPayload, encryptionParam); // H2RbToHyfXrAUJq3kCC81wlmpGRU5l4=
// Do send.
ProducerMessage msg = new ProducerMessage();
msg.key = msgKey;
msg.payload = encryptedPayload;
msg.encryptionParam = base64EncodedEncryptionParam;
msg.compressionType = compressionType;
msg.uncompressedMessageSize = uncompressedMessageSize;
this.session.getRemote().sendString(toJSON(msg));
}
public void connect() {
String protocolAndHostPort = "ws://localhost:55217";
String topicName = "perssitent://public/default/tp1";
StringBuilder consumerUri = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/consumer/persistent/")
.append(topicName)
.append("/")
.append(subscriptionName)
.append("?")
.append("subscriptionType=").append(subscriptionType.toString())
// Set "cryptoFailureAction" to "CONSUME".
.append("&").append("cryptoFailureAction=CONSUME");
WebSocketClient wssClient = new WebSocketClient();
wssClient.start();
Session session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get();
}
public byte[] messageReceived(String text) {
/**
* A demo of the parameter "text":
* {
* "messageId": "CAcQADAA",
* "payload": "ApU16CsV0iHO2zbX7T22jhGMzdjE5drm",
* "properties": {},
* "publishTime": "2023-08-22T02:40:32.856+08:00",
* "redeliveryCount": 0,
* "encryptionContext": {
* "keys": {
* "client-ecdsa.pem": {
* "keyValue": "BMQKA==",
* "metadata": {
* "k1": "v1"
* },
* "param": "SnqNyjPetp1dGBa6",
* "compressionType": "LZ4",
* "uncompressedMessageSize": 7,
* "batchSize": null
* }
* }
*/
ConsumerMessage msg = parseJsonToObject(text);
/**
* The constructor of encryptionContext:
* {
* "client-ecdsa.pem": {
* "keyValue": "BMQKA==",
* "metadata": {
* "k1": "v1"
* }
* }
* }
*/
EncryptionContext encryptionContext = msg.encryptionContext;
// base64Decode and decrypt message payload.
byte[] decryptedPayload = decrypt(base64Decode(msg.payload), encryptionContext);
//Un-compress is needed.
byte[] unCompressedPayload = unCompressIfNeeded(decryptedPayload);
return unCompressedPayload;
}
[1]: A workaround to avoid exposing the private key to Web Socket Proxy Server(should expose the public key to Web Socket Proxy Server). A quick background: there are three policies when a consumer cannot describe the message payload:
unackMessagesTracker. How this message is ultimately handled depends on the policy of unacknowledged messages.Workaround
cryptoFailureAction to CONSUME for the WSS consumerEncryptionKeyInfo to null for the CryptoKeyReader. This will make the internal consumer of Web Socket Proxy Server decrypt message payload fail.Then the flow of Pub & Sub will be executed like the following: