pip/pip-297.md
The Pulsar Function is a serverless computing framework that runs on top of Pulsar and processes messages.
The Pulsar IO Connector is a framework that allows users to easily integrate Pulsar with external systems, such as
databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage
connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and
sink. A source connector imports data from another system to Pulsar, while a sink connector exports data from
Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in
the following, we treat the connector as a special kind of function. The function refers to both function and
connector.
Function Instance is a running instance of a Pulsar IO Connector that interacts with a specific external system or a Pulsar Function that processes messages from the topic.
Function Framework is a framework for running the Function instance.
Function Context is an interface that provides access to various information and resources for the connector or the function. The function context is passed to the connector or the function when it is initialized, and then can be used to interact with the Pulsar system.
Function instance thread: The function framework initializes a thread for each function instance to handle the core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the exception, etc. And let's define the Connector thread/Function thread as a thread that is created by the connector or function itself.
Exception handling logic: The function itself can throw exceptions, and this thread will catch the exception and then close the function. This means that the function will stop working until it is restarted manually or automatically by the function framework.
Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or the framework:
All the exceptions thrown form the connector are treated as fatal exceptions.
If the exception is thrown from the function instance thread, the function framework will catch the exception and terminate the function instance.
If the exception is thrown from the connector thread that is created by the connector itself, the function framework
will not be able to catch the exception and terminate the function instance. The connector will hang forever.
The Motivation part will talk more about this case.
If the exception is thrown from the external system, the connector implementation could treat it as a retryable exception and retry to process the message later, or throw it to indicate it as a fatal exception.
All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will catch the exception and log it. But it will not terminate the function instance.
There is no way for the function developer to throw a fatal exception to the function framework to terminate the function instance.
Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler cannot handle the fatal exceptions that are thrown outside the function instance thread.
For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue here: https://github.com/apache/pulsar/issues/9464
The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has
been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
the notifyError method to the PushSource class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this
does not solve the same problem that all source connectors face because not all connectors are implemented based on
the PushSource class.
The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function framework. We need to provide a way for the function developer to implement it.
We need a way for the connector and function developers to throw fatal exceptions outside the function instance thread. The function framework should catch these exceptions and terminate the function accordingly.
Introduce a new method fatal to the context. All the connector implementation code and the function code
can use this context and call the fatal method to terminate the instance while raising a fatal exception.
After the connector or function raises the fatal exception, the function instance thread will be interrupted. The function framework then could catch the exception, log it, and then terminate the function instance.
This PIP proposes to add a new methodfatalto the context BaseContext. This method allows the connector or the
function code to report a fatal exception to the function framework and terminate the instance. The SinkContext
and SourceContext are all inherited from BaseContext. Therefore, all the sink connectors and source connectors can
invoke this new method. The pulsar function context class Context is also inherited from BaseContext. Therefore, the
function code can also invoke this new method.
In the fatal method, the function instance thread will be interrupted. The function instance thread can then
catch the interrupt exception and get the fatal exception. The function framework then logs this exception,
reports to the metrics, and finally terminates the function instance.
Tbe behavior when invoking the fatal method:
fatal methoddeathException in the
class JavaInstanceRunnable that is used to store the fatal exception.Introduce fatal method to the BaseContext:
public interface BaseContext {
/**
* Terminate the function instance with a fatal exception.
*
* @param t the fatal exception to be raised
*/
void fatal(Throwable t);
}
No changes for this part.
No changes for this part.
No changes for this part.
No changes for this part.
No changes for this part.
No security-related changes.
The new method fatal will only take effect on the current function instance. It won't affect other function instances
even they are in the same function worker.
No operation required.
No operation required.
The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer.
But it requires changes to existing interfaces, including Source and Sink, which can complicate connector
development. And we still need the fatal method to handle some cases such as terminating the instance in code outside
of the message processing logic. This alternative solution can't handle this case.
Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous pattern.