Tip
Using Broadway? Check out the companion project: off_broadway_pulsar.
An Elixir client for Apache Pulsar.
Add :pulsar_elixir to your dependencies in mix.exs:
def deps do
[
{:pulsar, "~> 2.8.4", hex: :pulsar_elixir}
]
endAssuming you have Pulsar running on localhost:6650, the quickest way to consume messages
from a Pulsar topic is using the Reader interace as shown below
"persistent://my-tenant/my-namespace/my-topic"
|> Pulsar.Reader.stream(host: "pulsar://localhost:6650", timeout: 100)
|> Enum.map(fn msg -> String.to_integer(msg.payload) end)
|> Enum.filter(fn n -> rem(n, 2) == 0 end)
|> Enum.map(fn n -> n * 2 end)For more complex scenarios and assuming that you have implemented a basic consumer like the one below:
defmodule MyPulsarConsumer do
use Pulsar.Consumer.Callback
def handle_message(message, state) do
IO.puts("Received: #{message.payload}")
{:ok, state}
end
endYou can start producing and consuming messages with the following configuration:
config :pulsar,
host: "pulsar://localhost:6650",
consumers: [
my_consumer: [
topic: "persistent://my-tenant/my-namespace/my-topic",
subscription_name: "my-subscription",
callback_module: MyPulsarConsumer
]
],
producers: [
my_producer: [
topic: "persistent://my-tenant/my-namespace/my-topic"
]
]Sending a message using the configured producer can be done as follows:
Pulsar.send(:my_producer, "Hello, Pulsar!")By default, brokers, consumers and producers are started within the scope of the
:default client, but you can also configure multiple clients (which may come in
handy if you need to connect to multiple clusters).
clients: [
client_1: [
host: "pulsar://host.cluster1.com:6650"
],
client_2: [
host: "pulsar://host.cluster2.com:6650"
]
]Then, you can specify the client in the consumer or producer configuration using
the client key, eg. client: :client_1.
producers: [
my_producer_1: [
client: :client_1
topic: "persistent://my-tenant/my-namespace/my-topic"
]
]If your Pulsar cluster requires authentication, you can configure it in the client
using the auth key:
auth: [
type: Pulsar.Auth.OAuth2,
opts: [
client_id: "<YOUR-OAUTH2-CLIENT-ID>",
client_secret: "<YOUR-OAUTH2-CLIENT-SECRET>",
site: "<YOUR-OAUTH2-ISSUER-URL>",
audience: "<YOUR-OAUTH2-AUDIENCE>"
]
]Important
Do not forget to add the following line to your /etc/hosts file before running the tests:
127.0.0.1 broker1 broker2
To run the tests, run the following command:
mix test
If you want to run only a subset of tests, specify the file including the tests you want to run
mix test test/integration/consumer_test.exs
You can also run individual tests by passing the line number where they are defined
mix test test/integration/consumer_test.exs:43
The examples directory includes a number of examples that demonstrate the use of the Pulsar client.
For example:
mix run --no-start examples/bingo.exs
The full feature matrix for Apache Pulsar can be found here.
| Component | Feature | Supported |
|---|---|---|
| Client | TLS encryption | ✅ |
| Client | Authentication | |
| Client | Transaction | ❌ |
| Client | Statistics | ❌ |
| Producer | Sync send | ✅ |
| Producer | Async send | ❌ |
| Producer | Batching | ✅ |
| Producer | Chunking | ✅ |
| Producer | Compression | ✅ |
| Producer | Schema | ✅ |
| Producer | Partitioned topics | ✅ |
| Producer | Access modes | ✅ |
| Consumer | ACK | ✅ |
| Consumer | Batch-index ACK | ✅ |
| Consumer | NACK | ✅ |
| Consumer | NACK back-off | ❌ |
| Consumer | Batching | ✅ |
| Consumer | Partitioned topics | ✅ |
| Consumer | Chunking | ✅ |
| Consumer | Seek | ✅ |
| Consumer | Subscription types | ✅ |
| Consumer | Subscription modes | ✅ |
| Consumer | Retry letter topic | ❌ |
| Consumer | Dead letter topic | ✅ |
| Consumer | Compression | ✅ |
| Consumer | Compaction | ✅ |
| Consumer | Schema | ✅ |
| Consumer | Configurable flow control settings | ✅ |
| Reader | ✅ | |
| TableView | ❌ |