Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pip/pip-453.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# PIP-453: Improve the metadata store threading model

# Background knowledge

The `pulsar-metadata` module provides two abstractions for interacting with metadata stores:
- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g. ZooKeeper), which has caches for value and children of a given key.
- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which performs serialization and deserialization of data between `T` and `byte[]`.

The `MetadataStore` instance is unique in each broker, and is shared by multiple `MetadataCache<T>` instances.

However, a single thread whose name starts with the metadata store name (e.g. `ZK-MetadataStore`) is used by implementations of them. This thread is used in the following tasks:
1. Executing callbacks of APIs like `put`.
2. Executing notification handlers, including `AbstractMetadataStore#accept`, which calls `accept` methods of all `MetadataCache` instances and all listeners registered by `MetadataStore#registerListener`.
3. For ZooKeeper and Etcd, which support batching requests, it's used to schedule flushing tasks at a fixed rate, which is determined by the `metadataStoreBatchingMaxDelayMillis` config (default: 5 ms).
4. Scheduling some other tasks, e.g. retrying failed operations.

It should be noted that `MetadataCache` executes the compute sensitive tasks like serialization in the `MetadataStore` callback. When the number of metadata operations grows, this thread is easy to be overwhelmed. It also affects the topic loading, which involves many metadata operations, this thread can be overwhelmed and block other tasks. For example, in a production environment, it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is high (100 ms), which should be close to 5 ms normally (configured by `metadataStoreBatchingMaxDelayMillis`).

# Motivation

The single thread model is inefficient when there are many metadata operations. For example, when a broker is down and the topics owned by this broker will be transferred to the new owner broker. Since the new owner broker might never owned them before, even the `MetadataCache` caches are cold, which results in many metadata operations. However, the CPU-bound tasks like serialization and deserialization are executed in the `MetadataStore` thread, which makes it easy to be overwhelmed. This affects the topic loading time and the overall performance of the broker.

In a production environment, there is a case when the metadata operation rate increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket` metric increased to ~100 ms, which is a part of the total latency of a metadata operation. As a result, the total P99 get latency (`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2 seconds.

The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`, which means if the task is not executed in time (5 ms by default), the task will be executed immediately again in a short time, which also burdens the single metadata store thread.

# Goals

## In Scope

Improve the existing thread model to handle various tasks on metadata store, which could avoid a single thread being overwhelmed when there are many metadata operations.

## Out of Scope

Actually the batching mechanism introduced by [#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush` method, which is responsible to send a batch of metadata operations to broker, is called in the metadata store thread rather than the caller thread. The trade-off of the higher throughput is the lower latency. The benefit is limited because in most time the metadata operation rate is not so high. See this [test report](https://github.com/BewareMyPower/zookeeper-bench/blob/main/report.md) for more details.

This proposal doesn't intend to change the existing batching mechanism or disable it by default. It only improves the threading model to avoid the single thread being overwhelmed.

Additionally, some code paths execute the compute intensive tasks in the metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`), this proposal does not aim at changing them to asynchronous methods (e.g. `thenApplyAsync`).

# High Level Design

Create 3 set of threads:
- `<name>-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore.
- `<name>-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false.
- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path.

Regarding the callbacks, don't switch to a different thread. This change is not breaking because the underlying metadata store usually executes the callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the single thread in the current implementation. The caller should be responsible to manage worker threads on the metadata operation result if the callback is compute intensive.

The only concern is that introducing a new thread to execute callbacks allows waiting for the future of metadata store APIs in the callback. After this change, the following use case could be a dead lock:

```java
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
```

Other tasks like the retry on failure is executed in JVM's common `ForkJoinPool` by `CompletableFuture` APIs. For example:

```diff
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
countsByType, totalSize, opsForLog);

// Retry with the individual operations
- executor.schedule(() -> {
- ops.forEach(o -> batchOperation(Collections.singletonList(o)));
- }, 100, TimeUnit.MILLISECONDS);
+ CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() ->
+ ops.forEach(o -> batchOperation(Collections.singletonList(o))));
} else {
MetadataStoreException e = getException(code, path);
ops.forEach(o -> o.getFuture().completeExceptionally(e));
```

# Detailed Design

## Public-facing Changes

### Configuration

Add a configurations to specify the number of worker threads for `MetadataCache`:

```java
@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of threads uses for serializing and deserializing data to and from the metadata store"
)
private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors();
```

### Metrics

The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `<name>-batch-flusher` thread won't execute other tasks except for flushing.

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d
* Mailing List voting thread: https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61