diff --git a/modules/ROOT/nav.adoc b/modules/ROOT/nav.adoc index df6fd49..2249ed0 100644 --- a/modules/ROOT/nav.adoc +++ b/modules/ROOT/nav.adoc @@ -31,15 +31,5 @@ *** xref:pulsar-io:connectors/sinks/kafka.adoc[] *** xref:pulsar-io:connectors/sinks/kinesis.adoc[] *** xref:pulsar-io:connectors/sinks/snowflake.adoc[] -* xref:functions:astream-functions.adoc[] -* Transformation Functions -** xref:functions:index.adoc[] -** xref:functions:cast.adoc[] -** xref:functions:compute.adoc[] -** xref:functions:drop.adoc[] -** xref:functions:flatten.adoc[] -** xref:functions:merge-key-value.adoc[] -** xref:functions:unwrap-key-value.adoc[] -** xref:functions:deploy-in-sink.adoc[] -** xref:functions:drop-fields.adoc[] +* xref:astra-streaming:developing:astream-functions.adoc[{pulsar-short} functions] * xref:astra-streaming:ROOT:astream-subscriptions.adoc[Topic subscriptions] \ No newline at end of file diff --git a/modules/functions/images/astream-advanced-config.png b/modules/functions/images/astream-advanced-config.png deleted file mode 100644 index 5ef7dc2..0000000 Binary files a/modules/functions/images/astream-advanced-config.png and /dev/null differ diff --git a/modules/functions/images/astream-delete-function.png b/modules/functions/images/astream-delete-function.png deleted file mode 100644 index b51cade..0000000 Binary files a/modules/functions/images/astream-delete-function.png and /dev/null differ diff --git a/modules/functions/images/astream-exclamation-function.png b/modules/functions/images/astream-exclamation-function.png deleted file mode 100644 index 0f06eef..0000000 Binary files a/modules/functions/images/astream-exclamation-function.png and /dev/null differ diff --git a/modules/functions/images/astream-function-controls.png b/modules/functions/images/astream-function-controls.png deleted file mode 100644 index 4ff5d1e..0000000 Binary files a/modules/functions/images/astream-function-controls.png and /dev/null differ diff --git a/modules/functions/images/astream-function-log.png b/modules/functions/images/astream-function-log.png deleted file mode 100644 index 765f3f0..0000000 Binary files a/modules/functions/images/astream-function-log.png and /dev/null differ diff --git a/modules/functions/images/astream-function-update.png b/modules/functions/images/astream-function-update.png deleted file mode 100644 index 9ac1c2a..0000000 Binary files a/modules/functions/images/astream-function-update.png and /dev/null differ diff --git a/modules/functions/images/astream-io-topics.png b/modules/functions/images/astream-io-topics.png deleted file mode 100644 index bfa3fbd..0000000 Binary files a/modules/functions/images/astream-io-topics.png and /dev/null differ diff --git a/modules/functions/images/astream-name-function.png b/modules/functions/images/astream-name-function.png deleted file mode 100644 index de6ce75..0000000 Binary files a/modules/functions/images/astream-name-function.png and /dev/null differ diff --git a/modules/functions/images/astream-optional-destination-topics.png b/modules/functions/images/astream-optional-destination-topics.png deleted file mode 100644 index b2bf4b1..0000000 Binary files a/modules/functions/images/astream-optional-destination-topics.png and /dev/null differ diff --git a/modules/functions/images/astream-provide-config-keys.png b/modules/functions/images/astream-provide-config-keys.png deleted file mode 100644 index 564d3f0..0000000 Binary files a/modules/functions/images/astream-provide-config-keys.png and /dev/null differ diff --git a/modules/functions/images/astream-transform-functions.png b/modules/functions/images/astream-transform-functions.png deleted file mode 100644 index aac1dd4..0000000 Binary files a/modules/functions/images/astream-transform-functions.png and /dev/null differ diff --git a/modules/functions/pages/astream-functions.adoc b/modules/functions/pages/astream-functions.adoc deleted file mode 100644 index 817c164..0000000 --- a/modules/functions/pages/astream-functions.adoc +++ /dev/null @@ -1,515 +0,0 @@ -= {pulsar-reg} functions -:navtitle: {pulsar-short} functions - -Functions are lightweight compute processes that enable you to process each message received on a topic. -You can apply custom logic to that message, transforming or enriching it, and then output it to a different topic. - -Functions run inside Astra Streaming and are therefore serverless. -You write the code for your function in Java, Python, or Go, then upload the code. -It is automatically run for each message published to the specified input topic. - -Functions are implemented using https://pulsar.apache.org/docs/en/functions-overview/[{pulsar-reg} functions]. - -[IMPORTANT] -==== -Custom functions require a xref:astra-streaming:operations:astream-pricing.adoc[paid Astra Streaming plan]. -==== - -== Deploy Python functions in a zip file - -Astra Streaming supports Python-based {pulsar-short} functions. -These functions can be packaged in a zip file and deployed to Astra Streaming or {pulsar-short}. -The same zip file can be deployed to either environment. - -To demonstrate this, the following steps create function configuration YAML file, package all necessary function files as a zip archive, and then use the `pulsar-admin` CLI to deploy the zip. -The configuration file defines the {pulsar-short} function options and parameters. - -[TIP] -==== -For video demos of a {pulsar-short} Python function, see the *Five Minutes About {pulsar-short}* series provides: - -video::OCqxcNK0HEo[youtube, list=PL2g2h-wyI4SqeKH16czlcQ5x4Q_z-X7_m, height=445px,width=100%] -==== - -. Create a directory and subdirectories for your function zip archive with the following structure: -+ -[source,plain] ----- -/parent-directory - /python-code - /deps - /src ----- -+ -For example, a function called `my-python-function` could have the following structure: -+ -[source,plain] ----- -/my-python-function - python-code/my-python-function.zip - python-code/deps/sh-1.12.14-py2.py3-none-any.whl - python-code/src/my-python-function.py ----- -+ -The following commands create the necessary directories for a function called `demo-function`: -+ -[source,bash] ----- -mkdir demo-function -mkdir demo-function/python-code -mkdir demo-function/python-code/deps/ -mkdir demo-function/python-code/src/ ----- - -. Create a Python file in the `/src` directory. -For example: -+ -[source,bash] ----- -touch demo-function/python-code/src/demo-function.py ----- - -. Add your function code to your Python file. -This example function adds an exclamation point to the end of each message: -+ -[source,python] ----- -from pulsar import Function - -class ExclamationFunction(Function): - def __init__(self): - pass - - def process(self, input, context): - return input + '!' ----- - -. Add your function's dependencies to the `demo-function/python-code/deps` directory. -This example uses the `pulsar-client` library: -+ -[source,bash] ----- -cd deps -pip install pulsar-client==2.10.0 ----- - -. Create the zip archive for your function in the `python-code` directory. -+ -For example, the following command is run from within the `/deps` directory and creates the `demo-function.zip` file in the parent `python-code` directory. -+ -[source,bash] ----- -cd deps -zip -r ../demo-function.zip . ----- -+ -Wait while the archive is packaged. - -. Verify that the zip file is in the `python-code` directory: -+ -[source,bash] ----- -python-code ls -al ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -deps -demo-function.zip -src ----- -==== - -=== Deploy a Python function with configuration file - -. Create a deployment configuration file named `func-create-config.yaml` with the following contents. -This file is passed to the `pulsar-admin` create function command. -+ -[source,yaml,subs="+quotes"] ----- -py: /absolute/path/to/demo-function.zip -className: pythonfunc.ExclamationFunction -parallelism: 1 -inputs: - - persistent://**TENANT_NAME**/**NAMESPACE_NAME**/**INPUT_TOPIC_NAME** -output: persistent://**TENANT_NAME**/**NAMESPACE_NAME**/**OUTPUT_TOPIC_NAME** -autoAck: true -tenant: **TENANT_NAME** -namespace: **NAMESPACE_NAME** -name: demofunction -logTopic: -userConfig: - logging_level: ERROR ----- -+ -Replace the following: -+ -* `**TENANT_NAME**`: The tenant where you want to deploy the function -* `**NAMESPACE_NAME**`: The namespace where you want to deploy the function -* `**INPUT_TOPIC_NAME**`: The input topic for the function -* `**OUTPUT_TOPIC_NAME**`: The output topic for the function - -. Use `pulsar-admin` to deploy the Python zip to Astra Streaming or {pulsar-short}. -The command below assumes you've properly configured the `client.conf` file for `pulsar-admin` commands against your {pulsar-short} cluster. If you are using Astra Streaming, see xref:astra-streaming:developing:configure-pulsar-env.adoc[] for more information. -+ -[source,console] ----- -bin/pulsar-admin functions create --function-config-file /absolute/path/to/func-create-config.yml ----- - -. Verify that the function was deployed: -+ -* Go to the {astra-ui} to see your newly deployed function listed under the **Functions** tab for your tenant. -See <> for more information on testing and monitoring your function in Astra Streaming. -* Use the `pulsar-admin` CLI to list functions for a specific tenant and namespace: -+ -[source,bash,subs="+quotes"] ----- -bin/pulsar-admin functions list --tenant **TENANT_NAME** --namespace **NAMESPACE_NAME** ----- - -== Deploy Java functions in a JAR file - -Astra Streaming supports Java-based {pulsar-short} functions which are packaged in a JAR file. -The JAR can be deployed to Astra Streaming or {pulsar-short}. -The same JAR file can be deployed to either environment. - -In this example, you'll create a function JAR file using Maven, then use the `pulsar-admin` CLI to deploy the JAR. -You'll also create a function configuration YAML file that defines the {pulsar-short} function options and parameters. - -. Create a properly-structured JAR with your function's Java code. -For example: -+ -.Example: Function pom.xml -[%collapsible] -==== -[source,xml] ----- - - - 4.0.0 - - java-function - java-function - 1.0-SNAPSHOT - - - - org.apache.pulsar - pulsar-functions-api - 3.0.0 - - - - - - - maven-assembly-plugin - - false - - jar-with-dependencies - - - - org.example.test.ExclamationFunction - - - - - - make-assembly - package - - assembly - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.11.0 - - 17 - - - - - - ----- -==== - -. Package the JAR file with Maven: -+ -[source,bash] ----- -mvn package ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -[INFO] ------------------------------------------------------------------------ -[INFO] BUILD SUCCESS -[INFO] ------------------------------------------------------------------------ -[INFO] Total time: 10.989 s -[INFO] Finished at: 2023-05-16T16:19:05-04:00 -[INFO] ------------------------------------------------------------------------ ----- -==== - -. Create a deployment configuration file named `func-create-config.yaml` with the following contents. -This file is passed to the `pulsar-admin` create function command. -+ -[source,yaml] ----- -jar: /absolute/path/to/java-function.jar -className: com.example.pulsar.ExclamationFunction -parallelism: 1 -inputs: - - persistent://mytenant/n0/t1 -output: persistent://mytenant/ns/t2 -autoAck: true -tenant: mytenant -namespace: ns0 -name: testjarfunction -logTopic: -userConfig: - logging_level: ERROR ----- -+ -[IMPORTANT] -==== -Astra Streaming requires the `inputs` topic to have a message schema defined before deploying the function. -Otherwise, deployment errors may occur. -Use the {astra-ui} to define the message schema for a topic. -==== - -. Use the `pulsar-admin` CLI to deploy your function JAR to Astra Streaming or {pulsar-short}. -+ -The following command assumes you've properly configured the `client.conf` file for `pulsar-admin` commands against your {pulsar-short} cluster. -If you are using Astra Streaming, see xref:astra-streaming:developing:configure-pulsar-env.adoc[] for more information. -+ -[source,bash] ----- -bin/pulsar-admin functions create --function-config-file /absolute/path/to/func-create-config.yml ----- - -. Verify that the function was deployed: -+ -* Go to the {astra-ui} to see your newly deployed function listed under the **Functions** tab for your tenant. -See <> for more information on testing and monitoring your function in Astra Streaming. -* Use the `pulsar-admin` CLI to list functions for a specific tenant and namespace: -+ -[source,bash,subs="+quotes"] ----- -bin/pulsar-admin functions list --tenant **TENANT_NAME** --namespace **NAMESPACE_NAME** ----- - -== Add functions in Astra Streaming dashboard - -Add functions in the **Functions** tab of the Astra Streaming dashboard. - -. Select *Create Function* to get started. - -. Choose your function name and namespace. -+ -image::astream-name-function.png[Function and Namespace] - -. Select the file you want to pull the function from and which function you want to use within that file. -Astra Streaming generates a list of acceptable classes. -+ -image::astream-exclamation-function.png[Exclamation Function] -+ -There are differences depending on the function language: -+ -* Python functions are added by loading a Python file (`.py`) or a zipped Python file (`.zip`). -+ -When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute. -For example, if the Python file is called `testfunction.py` and the class is `ExclamationFunction`, then the class name is `testfunction.ExclamationFunction`. -+ -The file can contain multiple classes, but only one is used. -If there is no class in the Python file (when using a basic function, for example), specify the filename without the extension, such as `testfunction`. -+ -* Java functions are added by loading a Java jar file (`.jar`). -When adding Java files, you must specify the name of the class to execute as the function. - -. Select your input topics. -+ -image:streaming-learning:functions:astream-io-topics.png[IO Topics] - -. Select **Optional Destination Topics** for output and logging. -+ -image:streaming-learning:functions:astream-optional-destination-topics.png[Optional Topics] - -. If applicable, configure the *Advanced Options*. -+ -image:streaming-learning:functions:astream-advanced-config.png[Advanced Configuration] - -. Run at least one sink instance. - -. Select an option for *Processing Guarantee*: -+ -* *ATLEAST_ONCE* (default): Each message sent to the function can be processed more than once. -* *ATMOST_ONCE*: The message sent to the function is processed at most once. Therefore, there is a chance that the message is not processed. -* *EFFECTIVELY_ONCE*: Each message sent to the function will have one output associated with it. - -. Provide an *Option Configuration Key*. -See the https://pulsar.apache.org/functions-rest-api/#operation/registerFunction[{pulsar-short} documentation] for a list of configuration keys. -+ -image:streaming-learning:functions:astream-provide-config-keys.png[Provide Config Key] - -. Click *Create*. - -. To verify that the function was created, review the list of functions on the *Functions* tab. - -== Add function with {pulsar-short} CLI - -You can add functions using the {pulsar-short} CLI. - -The following example creates a Python function that consumes a message from one topic, adds an exclamation point, and then publishes the results to another topic. - -. Add the following Python function code to a file named `testfunction.py`: -+ -.testfunction.py -[source, python] ----- -from pulsar import Function - -class ExclamationFunction(Function): - def __init__(self): - pass - - def process(self, input, context): - return input + '!' ----- - -. Deploy `testfunction.py` to your {pulsar-short} cluster using the {pulsar-short} CLI: -+ -[source,bash,subs="+quotes"] ----- -$ ./pulsar-admin functions create \ - --py /absolute/path/to/testfunction.py \ - --classname testfunction.ExclamationFunction \ - --tenant **TENANT_NAME** \ - --namespace default \ - --name exclamation \ - --auto-ack true \ - --inputs persistent://**TENANT_NAME**/default/in \ - --output persistent://**TENANT_NAME**/default/out \ - --log-topic persistent://**TENANT_NAME**/default/log ----- -+ -Replace **TENANT_NAME** with the name of the tenant where you want to deploy the function. -If you want to use a different namespace, replace `default` with another namespace name. -If you want to use different topics, change `in`, `out`, and `log` accordingly. - -. Verify that the response is `Created Successfully!`. -This indicates that the function was deployed and ready to run when triggered by incoming messages. -+ -If the response is `402 Payment Required` with `Reason: only qualified organizations can create functions`, then you must upgrade to a xref:astra-streaming:operations:astream-pricing.adoc[paid Astra Streaming plan]. -+ -You can also verify that a function was created by checking the **Functions** tab or by running `./pulsar-admin functions list --tenant **TENANT_NAME**`. - -== Testing Your Function - -Triggering a function is a convenient way to test that the function is working. -When you trigger a function, you publish a message on the function's input topic, which triggers the function. - - - - - -. Listen for messages on the output topic: -+ -[source,bash,subs="+quotes"] ----- -$ ./pulsar-client consume persistent://**TENANT_NAME**/default/out \ - --subscription-name my-subscription \ - --num-messages 0 # Listen indefinitely ----- -+ -Replace **TENANT_NAME** with the name of the tenant where you deployed the function. -If your function uses a different namespace and output topic name, replace `default` and `out` accordingly. -+ -If the function has an output topic, and the function returns data to the output topic, then that data is returned by the listener when you run the function. - -. Send a test value with the {pulsar-short} CLI `trigger` command: -+ -[source,bash,subs="+quotes"] ----- -$ ./pulsar-admin functions trigger \ - --name exclamation \ - --tenant **TENANT_NAME** \ - --namespace default \ - --trigger-value "Hello world" ----- -+ -This command sends the string `Hello world` to the exclamation function. -If deployed and configured correctly, the function should output `Hello world!` to the `out` topic. - -[#controlling-your-function] -== Controlling Your Function - -You can start, stop, and restart your function by selecting it in the *Functions* dashboard. - -image:streaming-learning:functions:astream-function-controls.png[Function Controls] - -== Monitoring Your Function - -Functions produce logs to help you in debugging. -To view your function's logs, open your function in the *Functions* dashboard. - -image:streaming-learning:functions:astream-function-log.png[Function Log] - -In the upper right corner of the function log are controls to *Refresh*, *Copy to Clipboard*, and *Save* your function log. - -== Updating Your Function - -A function that is already running can be updated with new configuration. -The following settings can be updated: - -* Function code -* Output topic -* Log topic -* Number of instances -* Configuration keys - -If you need to update any other setting of the function, delete and then re-add the function. - -. To update your function, select the function in the *Functions* dashboard. - -image::astream-function-update.png[Update Function] - -. Click *Change File* to select a local function file, and then click *Open*. - -. Update your function's *Instances* and *Timeout*. - -. Click *Update*. -+ -An *Updates Submitted Successfully* message confirms that the function was updated. - -== Deleting Your Function - -. Select the function to be deleted in the *Functions* dashboard. - -image::astream-delete-function.png[Delete Function] - -. Click *Delete*. - -. To confirm the deletion, enter the function's name, and then click *Delete*. -+ -A *Function-name Deleted Successfully!* message confirms the function was permanently deleted. - -== Next steps - -Learn more about developing functions for Astra Streaming and {pulsar-short} https://pulsar.apache.org/docs/en/functions-develop/[here]. \ No newline at end of file diff --git a/modules/functions/pages/cast.adoc b/modules/functions/pages/cast.adoc deleted file mode 100644 index 0a21815..0000000 --- a/modules/functions/pages/cast.adoc +++ /dev/null @@ -1,51 +0,0 @@ -= Cast -:functionName: cast - -The cast transform function transforms the data to a target compatible schema. - -The `step` name is `cast`, and the `UserConfig` is controlled here: `{"steps": [{"type": "cast", "schema-type": "STRING"}]}`. - -== Parameters: - -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|schema-type -|The target schema type. Only `STRING` is available. - -|part -|when used with KeyValue data, defines if the transform function is done on the `key` or on the `value`. If `null` or absent the transform function applies to both the key and the value. -|=== - -== Example: - -. Produce an AVRO message with the payload: `{field1: value1, field2: value2}`: -+ -[tabs] -==== -AVRO:: -+ --- -[source,json,subs="attributes+"] ----- -{"field1": "value1", "field2": "value2"} ----- --- - -Result:: -+ --- -[source,json,subs="attributes+"] ----- -{"field1": "value1", "field2": "value2"} ----- --- -==== - -. The function casts the values as a string: `{"field1": "value1", "field2": "value2"}`. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. diff --git a/modules/functions/pages/compute.adoc b/modules/functions/pages/compute.adoc deleted file mode 100644 index 1e1518b..0000000 --- a/modules/functions/pages/compute.adoc +++ /dev/null @@ -1,326 +0,0 @@ -= Compute -:functionName: compute - -The `compute` transform function computes field values based on an `expression` evaluated at runtime. + -If the field already exists, it will be overwritten. + -The step name is `compute` and the function's `UserConfig` is controlled in this step: + -[source,json] ----- -{"steps": [{"type": "compute", "fields":[ - {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"}, - {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]} -]} ----- - -== Parameters: - -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|fields -|An array of JSON objects describing how to calculate the field values. The JSON object represents a `field`. See <>. - -|=== - -[#field-params] -=== Field parameters - -[cols=2*,options=header] -|=== -| *Name (field)* -| *Description* - -| name -| The name of the field to be computed. Prefix with `key.` or `value.` to compute the fields in the key or value parts of the message. + -You can also compute values on the following message headers: [`destinationTopic`, `messageKey`, `properties.`]. + -Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example `properties.key0`. - -| expression -| Supports the <> syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field. - -| type -| The type of the computed field. this will translate to the schema type of the new field in the transformed message. The following types are currently supported [`STRING`, `INT32`, `INT64`, `FLOAT`, `DOUBLE`, `BOOLEAN`, `DATE`, `TIME`, `DATETIME`]. See <>. - -| optional (default: true) -|If true, it marks the field as optional in the schema of the transformed message. This is useful when `null` is a possible value of the compute expression. - -|=== - -[#type-params] -=== Type parameters - -[cols=3*,options] -|=== -| *Name (field.type)* -| *Description* -| *Expression Examples* - -| `INT32` -| represents 32-bit integer. -| expression1: "2147483647", expression2: "1 + 1" - -| `INT64` -| represents 64-bit integer. -| expression1: "9223372036854775807", expression2: "1 + 1" - -| `FLOAT` -| represents 32-bit floating point. -| expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1" - -| `DOUBLE` -| represents 64-bit floating point. -| expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1" - -| `BOOLEAN` -| true or false -| expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'" - -| `DATE` -| a date without a time-zone in the https://www.rfc-editor.org/rfc/rfc3339[RFC3339 format] -| expression1: "2021-12-03" - -| `TIME` -| a time without a time-zone in the https://www.rfc-editor.org/rfc/rfc3339[RFC3339 format] -| expression1: "20:15:45" - -| `DATETIME` -| a date-time with an offset from UTC in the https://www.rfc-editor.org/rfc/rfc3339[RFC3339 format] -| expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()" - -|=== - -[#expression-language] -== Expression language - -{pulsar-short} transforms use a simple expression language to evaluate conditional steps and compute steps. + -The expression language uses dot notation to access field properties or map keys, and supports the following operations and functions: + - -=== Operators -The Expression Language supports the following operators: -[cols=2*,options=header] -|=== -| *Category* -| *Suppoerted operators* - -|Arithmetic -| +, - (binary), *, / and div, % and mod, - (unary) - -|Logical -|and, &&, or \|\|, not, ! - -|Relational -|==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le. -|=== - -=== Functions - -Utility methods are available under the `fn` namespace. For example, to calculate the current date, use `fn:now()`. + -The expression language supports the following functions: + - -[cols=2*,options=header] -|=== -| *Name (field)* -| *Description* -|uppercase(input) -|Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. - -|lowercase(input) -|Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. - -|contains(input, value) -|Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false. - -|trim(input) -|Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion. - -|concat(input1, input2) -|Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty string. - -|coalesce(value, valueIfNull) -|Returns value if it is not null, otherwise returns valueIfNull. - -|now() -|Returns the current epoch millis. - -|dateadd(input, delta, unit) -|Performs date/time arithmetic operations on the input date/time. + -`input` can be either epoch millis or an RFC3339 format like "2022-10-14T10:15:30+01:00" + -`delta` is the amount of unit to add to input. Can be a negative value to perform subtraction. -`unit` is the unit of time to add or subtract. Can be one of `[years, months, days, hours, minutes, seconds, millis]`. -|=== - -=== Conditional Steps -Each `step` accepts an optional `when` configuration that is evaluated at step execution time against current records (the current step in the transform pipeline). + -The `when` condition supports the , which provides access to the record attributes as follows: - -[cols=2*,options=header] -|=== -| *Name (field)* -| *Description* -|key: -|the key portion of the record in a KeyValue schema. - -|value: -|the value portion of the record in a KeyValue schema, or the message payload itself. - -|messageKey: -|the optional key messages are tagged with (aka. Partition Key). - -|topicName: -|the optional name of the topic which the record originated from (aka. Input Topic). - -|destinationTopic: -|the name of the topic on which the transformed record will be sent (aka. Output Topic). - -|eventTime: -|the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message. - -|properties: -|the optional user-defined properties attached to record. - -|=== - -You can use the `.` operator to access top level or nested properties on a schema-full key or value. + -For example, `key.keyField1` or `value.valueFiled1.nestedValueField`. + - -==== `When` example - -For this KeyValue record: - -[source,json] ----- -{ - "key": { - "compound": { - "uuid": "uuidValue", - "timestamp": 1663616014 - }, - "value" : { - "first" : "f1", - "last" : "l1", - "rank" : 1, - "address" : { - "zipcode" : "abc-def" - } - } - } -} ----- - -These statements would evaluate in a `when` statement: - -[cols=2*,options=header] -|=== -| *`when` statement* -| *Evaluates to:* - -|key.compound.uuid == 'uuidValue' -|True - -|key.compound.timestamp \<= 10 -|False - -|value.first == 'f1' && value.last.toUpperCase() == 'L1' -|True - -|value.rank \<= 1 && value.address.substring(0, 3) == 'abc' -|True - -|=== - -== Multiply and concatenate example - -. Create a `compute` transform function with the {pulsar-short} admin CLI: -+ -[source,shell] ----- -./bin/pulsar-admin functions create \ ---tenant ${TENANT} \ ---namespace ${NAMESPACE} \ ---name transform-function \ ---inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \ ---output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \ ---classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ ---jar functions/pulsar-transformations-2.0.1.nar \ ---transform-function-config '{"steps": [{"type": "compute", "fields":[ - {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"}, - {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]} -]}' ----- - -. Produce an AVRO message with the payload: -+ -[tabs] -==== -AVRO:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key}, value={valueField: value}} (KeyValue) ----- --- - -Result:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue) ----- --- -==== - -. The function applies preprocessing to outgoing messages, in this case performing multiplication and concatenation operations to output `{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue)` to your output topic. - -== Message routing example - -. Create a `compute` transform function with the {pulsar-short} admin CLI: -+ -[source,shell] ----- -./bin/pulsar-admin functions create \ ---tenant ${TENANT} \ ---namespace ${NAMESPACE} \ ---name transform-function \ ---inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \ ---output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \ ---classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ ---jar functions/pulsar-transformations-2.0.1.nar \ ---transform-function-config `{"steps": [{"type": "compute", "fields":[ - {"name": "destinationTopic", "expression" : "'routed'", "type": "STRING"}, - {"name": "properties.k1", "expression" : "'overwritten'", "type": "STRING"}, - {"name": "properties.k2", "expression" : "'new'", "type": "STRING"}]} - ]}` ----- - -. Produce an AVRO message with the following payload: -+ -[tabs] -==== -AVRO:: -+ --- -[source,,subs="attributes+"] ----- -key={keyField: key}, value={valueField: value}} (KeyValue), headers=destinationTopic: out1, properties: {k1:v1} ----- --- - -Result:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key}, value={valueField: value}} (KeyValue), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new} ----- --- -==== - -. The function applies preprocessing to outgoing messages, in this case re-routing the destination topics with the output `{key={keyField: key}, value={valueField: value}} (KeyValue), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}`. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/functions/pages/deploy-in-sink.adoc b/modules/functions/pages/deploy-in-sink.adoc deleted file mode 100644 index 5e6f718..0000000 --- a/modules/functions/pages/deploy-in-sink.adoc +++ /dev/null @@ -1,58 +0,0 @@ -= Deploy transform function in sink - -With modern Pulsar versions, transform functions can be deployed inside of a sink process. + -Before this update, functions transformed data either after it was written to a topic by a source connector, or before it was read from a topic by a sink connector. + -This required either an intermediate topic, with additional storage, IO, and latency, or a custom connector. + -Now, functions can be deployed at sink creation and apply preprocessing to sink topic writes. + - -== Create sink function in Astra Streaming - -Creating a sink function is similar to creating a sink in the {astra-ui}, but with a few additional steps. - -. xref:pulsar-io:connectors/index.adoc[Create a sink] as described in the Astra Streaming documentation. - -. During sink creation, select the transform function you want to run inside the sink. -+ -image::astream-transform-functions.png[Connect Topics] - -. When the sink is up and running, inspect the sink connector's log. -The function is loaded at sink creation: -+ -[source,console] ----- -2022-11-14T15:01:02.398190413Z 2022-11-14T15:01:02,397+0000 [main] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instanceId 0 functionId f584ae69-2eda-449b-9759-2d19fd7c4da5 namespace astracdc ----- - -. The function then applies preprocessing to outgoing messages, in this case casting an AVRO record to `String` to your selected topic: -+ -[source,json] ----- -{{"field1": "value1", "field2": "value2"}} ----- - -== Create sink function in pulsar-admin - -https://github.com/datastax/pulsar[Luna Streaming 2.10+] is required to deploy custom functions in {pulsar-short}. - -Create a sink connector, and include the path to the transform function and configuration at creation: - -[source,shell] ----- -pulsar-admin sinks create \ ---sink-type elastic-sink \ ---inputs my-input-topic \ ---tenant public \ ---namespace default \ ---name my-sink \ ---transform-function "builtin://transforms" \ ---transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}' ----- - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. - - - - - diff --git a/modules/functions/pages/drop-fields.adoc b/modules/functions/pages/drop-fields.adoc deleted file mode 100644 index c9baf3c..0000000 --- a/modules/functions/pages/drop-fields.adoc +++ /dev/null @@ -1,52 +0,0 @@ - -= Drop fields -:functionName: drop-fields - -The {functionName} transform function drops fields of structured data (Currently only AVRO is supported). + -The cast transform function transforms the data to a target compatible schema. - -The `step` name is `drop-field`, and the `UserConfig` is controlled here: `{"steps": [{"type": "drop-fields", "fields": "password,other"}]}`. - -== Parameters: -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|fields -|A comma-separated list of fields to drop. - -|part -|when used with KeyValue data, defines if the transform function is done on the `key` or on the `value`. If `null` or absent the transform function applies to both the key and the value. -|=== - -== Example - -. Produce an AVRO message with the payload: `{name: value1, password: value2}`. -+ -[tabs] -==== -AVRO:: -+ --- -[source,json,subs="attributes+"] ----- -{"name": "value1", "password": "value2"} - ----- --- - -Result:: -+ --- -[source,json,subs="attributes+"] ----- -{"name": "value1"} ----- --- -==== -. The function drops the fields specified in the `fields` parameter, and outputs `{"{name: value1}"}` to the output topic. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/functions/pages/drop.adoc b/modules/functions/pages/drop.adoc deleted file mode 100644 index 9161db8..0000000 --- a/modules/functions/pages/drop.adoc +++ /dev/null @@ -1,27 +0,0 @@ -= Drop -:functionName: drop - -The {functionName} transform function drops a record from further processing. + -Use in conjunction with `when` to selectively drop records. + -The step name is `drop`, and the `UserConfig` is controlled here: `{"steps": [{"type": "drop", "when": "value.firstName == value1"}]}`. - -== Parameters: -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|when -|By default, the record is dropped. Set this parameter to selectively choose when to drop a message. - -|=== - -== Example - -. Produce an AVRO message with the payload: `{firstName: value1, lastName: value2}`. -. The function drops records matching the `when` statement `value.firstName == value1`. + -There is no output. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/functions/pages/flatten.adoc b/modules/functions/pages/flatten.adoc deleted file mode 100644 index 7b23574..0000000 --- a/modules/functions/pages/flatten.adoc +++ /dev/null @@ -1,49 +0,0 @@ -= Flatten -:functionName: flatten - -The {functionName} transform function converts structured, nested data into a new single-hierarchy-level structured data. + -The names of the new fields are built by concatenating the intermediate level field names. + -The step name is `flatten`, and the `UserConfig` is controlled here: `UserConfig: {"steps": [{"type": "flatten"}]}`. - -== Parameters: -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|delimiter -|The delimiter to use when concatenating the field names (default: _) - -|part -|when used with KeyValue data, defines if the transform function is done on the `key` or on the `value`. If `null` or absent the transform function applies to both the key and the value. -|=== - -== Example - -. Produce an AVRO message with the payload: `{field1: {field11: value11, field12: value12}}`. -+ -[tabs] -==== -AVRO:: -+ --- -[source,json,subs="attributes+"] ----- -{"field1": {"field11": "value11", "field12": "value12"}} ----- --- - -Result:: -+ --- -[source,json,subs="attributes+"] ----- -{"field1_field11": "value11", "field1_field12": "value12"} ----- --- -==== -. The function flattens the fields with the default `delimiter` between values, and outputs `{field1_field11: value11, field1_field12: value12}` to the output topic. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/functions/pages/index.adoc b/modules/functions/pages/index.adoc deleted file mode 100644 index 44485b9..0000000 --- a/modules/functions/pages/index.adoc +++ /dev/null @@ -1,210 +0,0 @@ -= Transform Functions -:navtitle: Overview - -A {pulsar-short} *transform function* is a low-code implementation of xref:astra-streaming:developing:astream-functions.adoc[{pulsar-short} functions]. + -Functions receive data from one or more input topics, apply user-supplied processing, and publish the results to another topic. + -Custom functions are a powerful feature, but for common data transformations, we now include *Transform Functions*. -Drop fields, flatten, compute, and more without coding or deep schema knowledge. -{company} has created the following transform functions for common data transforms, but we're always experimenting with new ones. -Check back as the list grows, or let us know some functions you'd find helpful in your deployment. - -[#transform-list] -== Transforms - -* **Cast**: The xref:cast.adoc[cast transform function] modifies the key or value schema to a target compatible schema. -* **Compute**: The xref:compute.adoc[compute transform function] computes new field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten. -* **Drop-fields**: The xref:drop-fields.adoc[drop-fields transform function] drops fields from structured data. -* **Drop**: The xref:drop.adoc[drop transform function] drops a record from further processing. -* **Flatten**: The xref:flatten.adoc[flatten transform function] flattens structured data. -* **Merge KeyValue**: The xref:merge-key-value.adoc[merge KeyValue transform function] merges the fields of KeyValue records where both the key and value are structured data with the same schema type. -* **Unwrap KeyValue**: The xref:unwrap-key-value.adoc[unwrap KeyValue transform function] extracts the KeyValue's key or value, and then makes it the record value if the record is a KeyValue. - -[#transform-config] -== Configuration - -The `TransformFunction` reads its configuration as `JSON` from the Function `userConfig` parameter in the format: - -[source,json] ----- -{ - "steps": [ - { - "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key" - }, - { - "type": "merge-key-value" - }, - { - "type": "unwrap-key-value" - }, - { - "type": "cast", "schema-type": "STRING" - } - ] -} ----- - -Transform functions are performed in the order in which they appear in the `steps` array. -Each step is defined by its `type` and uses its own arguments. -Each step can be dynamically toggled on or off by supplying a `when` condition that evaluates to true or false. - -For example, if the previous configuration is applied to a `KeyValue` input record, the following transformed values are returned after each step: - -[source,avro] ----- -# Original input record -{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} - -# Transformations -(KeyValue) - | - | "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key" - | -{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue) - | - | "type": "merge-key-value" - | -{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue) - | - | "type": "unwrap-key-value" - | -{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO) - | - | "type": "cast", "schema-type": "STRING" - | -{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING) ----- - -[#deploy-cli] -== Deploy with {pulsar-short} CLI - -https://github.com/datastax/pulsar[Luna Streaming 2.10+] is required to deploy custom functions in {pulsar-short}. - -The transform function `.nar` lives in the `/functions` directory of your {pulsar-short} deployment. - -[tabs] -====== -{pulsar-short} standalone:: -+ --- -To deploy the built-in transform function locally in {pulsar-short} standalone, do the following: - -. Start {pulsar-short} standalone: -+ -[source,shell] ----- -./bin/pulsar standalone ----- - -. Create a transform function in `localrun` mode: -+ -[source,shell,subs="attributes+"] ----- -./bin/pulsar-admin functions localrun \ ---jar functions/pulsar-transformations-2.0.1.nar \ ---classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ ---inputs my-input-topic \ ---output my-output-topic \ ---user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}' ----- --- - -{pulsar-short} cluster:: -+ --- -To deploy a built-in transform function to a {pulsar-short} cluster, do the following: - -. Create a built-in transform function with the {pulsar-short} CLI: -+ ----- -./bin/pulsar-admin functions create \ ---tenant $TENANT \ ---namespace $NAMESPACE \ ---name transform-function \ ---inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC ---output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC \ ---classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ ---jar functions/pulsar-transformations-2.0.1.nar ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -Created successfully ----- -==== - -. Confirm your function has been created: -+ -[source,shell] ----- -./bin/pulsar-admin functions list --tenant $TENANT ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -cast-function -flatten-function -transform-function -transform-function-2 ----- -==== --- -====== - -[#deploy-as] -== Deploy with Astra Streaming - -Deploy transform functions in the *Functions* tab of the {astra-ui}. - -The process is similar to xref:astra-streaming:developing:astream-functions.adoc[creating a function in the {astra-ui}], but with a few additional steps. - -. After naming your new function, select the *Use {company} transform function* option. - -. Select a transform function from the list of available functions: -+ -image::astream-transform-functions.png[Connect Topics] - -. Select the transform function's namespace and input topic(s). - -. Select the transform function's namespace, output topic, and log topic. -+ -The log topic is a separate output topic for messages containing additional `loglevel`, `fqn`, and `instance` properties. - -. Specify advanced configuration options, if applicable. - -. Pass JSON configuration values with your function, if applicable. -+ -For more, see the transform function <> table. - -. Select *Create*. -The transform function will initialize and begin processing data changes. - -. Confirm your function has been created with the {pulsar-short} CLI: -+ -[source,shell] ----- -./bin/pulsar-admin functions list --tenant $TENANT ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -cast-function -flatten-function -transform-function -transform-function-2 ----- -==== - -== See also - -* xref:astra-streaming:developing:astream-functions.adoc[] -* https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation] \ No newline at end of file diff --git a/modules/functions/pages/merge-key-value.adoc b/modules/functions/pages/merge-key-value.adoc deleted file mode 100644 index 8c06acb..0000000 --- a/modules/functions/pages/merge-key-value.adoc +++ /dev/null @@ -1,35 +0,0 @@ -= Merge KeyValue -:functionName: merge-key-value - -The {functionName} transform function merges the fields of KeyValue records where both the key and value are structured types of the same schema type. (Currently only AVRO is supported). + -The step name is `merge-key-value` and the `UserConfig` is controlled here: `{"steps": [{"type": "merge-key-value"}]}`. - -== Example - -. Produce an AVRO message with the payload: `{key={keyField: key}, value={valueField: value}}`. -+ -[tabs] -==== -AVRO:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key}, value={valueField: value}} ----- --- - -Result:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key}, value={keyField: key, valueField: value}} ----- --- -==== -. The function merges the KeyValue fields (because both are AVRO type) and outputs `{key={keyField: key}, value={keyField: key, valueField: value}}` to the output topic. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/functions/pages/unwrap-key-value.adoc b/modules/functions/pages/unwrap-key-value.adoc deleted file mode 100644 index 3dc87df..0000000 --- a/modules/functions/pages/unwrap-key-value.adoc +++ /dev/null @@ -1,46 +0,0 @@ -= Unwrap KeyValue -:functionName: unwrap-key-value - -If the record value is a KeyValue, the {functionName} transform function extracts the KeyValue's key or value and makes it the record value. + -The step name is `unwrap-key-value`, and the `UserConfig` is controlled here: `{"steps": [{"type": "unwrap-key-value"}]}`. - -== Parameters: -[cols=2*,options=header] -|=== -|*Parameter* -|*Description* - -|unwrapKey -|By default, the KeyValue's value is unwrapped. Set this parameter to `true` to unwrap the key instead. - -|=== - -== Example - -. Produce an AVRO message with the payload: `{key={keyField: key}, value={valueField: value}}`. -+ -[tabs] -==== -AVRO:: -+ --- -[source,,subs="attributes+"] ----- -{key={keyField: key}, value={valueField: value}} ----- --- - -Result:: -+ --- -[source,json,subs="attributes+"] ----- -{"valueField": "value"} ----- --- -==== -. The function extracts the KeyValue's key and outputs `{valueField: value}` to the output topic. - -== What's next? - -For more, see xref:index.adoc[] or the https://pulsar.apache.org/docs/functions-overview[{pulsar-short} documentation]. \ No newline at end of file