pip/pip-4.md
Many systems use Pulsar to move messages between system components. Many of these systems publish, or would like to publish, messages that contain sensitive information. Across Pulsar clusters, network capture tools can be used to read these messages; in addition anyone with access to the persistent storage layer, or administrative access to Pulsar, can read all messages. Currently clients that wants to encrypt published messages have to implement their own solution outside of Pulsar client. It would be a better solution if Pulsar itself supports End To End Encryption as a feature. Once implemented, Pulsar clients have the option to protect their data by encrypting at the producer and decrypting at the consumer, which will prevent anyone other than the consumer with the appropriate key from decrypting the data.
The Pulsar client is modified so that a producer object may have one or more keys bound to it. At a minimum, the API will require a key name, the key content, and the key type (for future extensibility).
The Pulsar client is modified so that a consumer object may have one or more keys bound to it for decryption. These would be the private keys corresponding to the public ones given to the producer. The user should be able to dynamically alter the set of bound keys.
Generate public/private key pair and store them in a file or keystore. The key management and distribution is outside the scope of Pulsar.
* Generating ECDSA key pair
1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
1. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out test_ecdsa_pubkey.pem
* Generating RSA key pair
1. openssl genrsa -out test_rsa_privkey.pem 2048
1. openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
ProducerConfiguration conf = new ProducerConfiguration()conf.addEncryptionKey(“myapp.key”)In some cases, the producer may want to encrypt the session key using multiple key individually and have them published with the corresponding key name in the message. Call conf.addEncryptionKey(“myapp.key”) with the keyname to add them to the producer config. Consumer will be able to decrypt the message, as long as it has access to at least one of the keys.
EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta)
class EncKeyReader implements CryptoKeyReader {
EncryptionKeyInfo publicKeyInfo = new EncryptionKeyInfo();
EncryptionKeyInfo privateKeyInfo = new EncryptionKeyInfo();
EncKeyReader(EncryptionKeyInfo publicKeyInfo, EncryptionKeyInfo privateKeyInfo) {
this. publicKeyInfo = publicKeyInfo;
this. privateKeyInfo = privateKeyInfo;
}
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
return this.privateKeyInfo;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
return null;
}
}
CryptoKeyReader keyReader = new CryptoKeyReader(pubKey, privKey);
conf.setCryptoKeyReader(keyReader);
PulsarClient client = PulsarClient.create("pulsar://localhost:6650");
Producer producer = client.createProducer("persistent://property/cluster/ns/topic", conf);
producer.send(msg);
ConsumerConfiguration conf = new ConsumerConfiguration()
EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta)
class EncKeyReader implements CryptoKeyReader {
EncryptionKeyInfo publicKeyInfo = new EncryptionKeyInfo();
EncryptionKeyInfo privateKeyInfo = new EncryptionKeyInfo();
EncKeyReader(EncryptionKeyInfo publicKeyInfo, EncryptionKeyInfo privateKeyInfo) {
this. publicKeyInfo = publicKeyInfo;
this. privateKeyInfo = privateKeyInfo;
}
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
return this.privateKeyInfo;
}
}
CryptoKeyReader keyReader = new CryptoKeyReader(pubKey, privKey);
conf.setCryptoKeyReader(keyReader);
PulsarClient client = PulsarClient.create("pulsar://localhost:6650");
Consumer consumer = client.subscribe("persistent://property/cluster/ns/topic", "subscription-name", conf);
Message msg = consumer.receive();
send()/sendAsync() will fail indicating the cause of the failure. Application has the option to proceed with sending unencrypted message in such cases. Call conf.setCryptoFailureAction(ProducerCryptoFailureAction) to control the producer behavior. The default behavior is to fail the request.conf.setCryptoFailureAction(ConsumerCryptoFailureAction) to control the consumer behavior. The default behavior is to fail the request.conf.setCryptoFailureAction() is set to CONSUME.encryption_keys