pip/pip-312.md
States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual function and shared between instances of that function.
Pulsar Functions use StateStoreProvider to initialize a StateStore to manage state, so it can support multiple state storage backend, such as:
BKStateStoreProviderImpl: use Apache BookKeeper as the backendPulsarMetadataStateStoreProviderImpl: use Pulsar Metadata as the backendUsers can also implement their own StateStoreProvider to support other state storage backend.
The Broker also exposes two endpoints to put and query a state key of a function:
Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's StorageAdminClient directly to put and query state,
this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper.
See: code
This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.
None
StorageAdminClient in ComponentImpl with StateStoreProvider to manage state.cleanup method to the StateStoreProvider interfaceIn the ComponentImpl#getFunctionState and ComponentImpl#queryState methods, replace the StorageAdminClient with StateStoreProvider:
String tableNs = getStateNamespace(tenant, namespace);
String tableName = functionName;
String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
if (storageClient.get() == null) {
storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
.build());
}
...
Replaced to:
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
Add a cleanup method to the StateStoreProvider interface:
default void cleanUp(String tenant, String namespace, String name) throws Exception;
Because when delete a function, the related state store should also be deleted.
Currently, it's also using BookKeeper's StorageAdminClient to delete the state store table:
deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
private void deleteStatestoreTableAsync(String namespace, String table) {
StorageAdminClient adminClient = worker().getStateStoreAdminClient();
if (adminClient != null) {
adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
if ((throwable == null && res)
|| ((throwable instanceof NamespaceNotFoundException
|| throwable instanceof StreamNotFoundException))) {
log.info("{}/{} table deleted successfully", namespace, table);
} else {
if (throwable != null) {
log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable);
} else {
log.error("{}/{} table deletion failed but moving on", namespace, table);
}
}
});
}
}
So this proposal will add a cleanup method to the StateStoreProvider and call it after a function is deleted:
worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
Add a new init method to StateStoreProvider interface:
The current init method requires a FunctionDetails parameter, but we cannot get the FunctionDetails in the ComponentImpl class,
and this parameter is not used either in BKStateStoreProviderImpl or in PulsarMetadataStateStoreProviderImpl,
but for backward compatibility, instead of updating the init method, this proposal will add a new init method without FunctionDetails parameter:
default void init(Map<String, Object> config) throws Exception {}
None
Nothing needs to be done.