Skip to main content

Command Palette

Search for a command to run...

Temporal as Data Orchestrator

Updated
5 min read
Temporal as Data Orchestrator

This is neither some notes nor advice based on my experience of using Temporal as data orchestrator for my ETL pipeline. Initially I felt it just the same as other orchestrators like Airflow, Prefect, or Dags or what so ever, however I learned it in hard way because of the concept of durable execution.

Durable execution?

It means, the process or function or method that you want to run will never failed. (it actually will failed, based on the maximum attempt that you decide to put). What it means to be never failed is, it will do exponential backoff retry, to ensure the process succeed. Even if it is failed, it can continue from the last state or checkpoint it failed within the workflow.

In order to do this, Temporal saves the state in the event history, which has limited of payload size, which has became a headache for me to solve the issue at first (but then can be solve using Temporal features).

The reason is the limitation of the payload size, although you can overwrite it in config (sadly cannot configured through Python SDK, but in Go), is first because of the durable execution itself. Imagine, you have something huge, which is the payload that you fetched from API or database. The workflow then failed, which then will be retry again from the last checkpoint saved in the event history but fetching again; this workflow then will consume so much computation as it will retry and trying to process that big chad payload and still save it into the history, that soon will affect the performance.

Hence to solve this, Temporal introduced continue_as_new and child_workflow to solve the problem. By having continue-as-new, it will create a new fresh empty event history to run the workflow, while using the child workflow by iterating through all the item can batch the workflow into respective item to be executed, instead of relying on a single workflow.

This would also help the monitoring the workflow progression, and ensure decoupling and atomic workflow.

Functional or OOP

In Temporal, the Workflow components consists of function, or can be called as Activity. Activity can be a function or a method (OOP). Personally if I can turn back time I will just do functional programming, instead of OOP in my pipeline. I used OOP because it is easier for me to access the method within the class, as of the assistance of intelli-sense within the VSCode.

Nevertheless, you can just design a stateless OOP, because we cannot having state within Temporal due to the durable execution again. Hence, to use OOP, you need to design in away either of staticmethod or just treat it like a normal function, but just in a class. (not saving or mutating the object attribute).

Non-deterministic

Another concept that I learned is the non-deterministic, which is from the non-deterministic error. This is just another fancy words in data engineering, similar to idempotent, which is the consistent value produced whenever the process run.

This happened when I intend to get the current timestamp using datetime.now() in the workflow. Anything that is non-deterministic are not allowed in the Workflow, also due to the durable execution. Temporal need to ensure that whenever the Workflow crashed or reset, and undergo retrying, the Workflow still have the same value like the first time execution. Using datetime.now() will produce different value whenever retrying happened, as it dynamically change the value based on the current time, hence it is not allowed in Temporal.

To solve this problem, we can declare the current time on the client side. This will make sure the execution of the time value variable like now, or yesterday would be consistent and allowed by Temporal.

Steep learning curve

I have some experience of using other data orchestrator such Airflow, Prefect and Dags, however Temporal giving the steepest learning curve for me to understand, given its design of durability execution and gRPC, and also the method of running the Activity and Workflow is different compared to Airflow and other dags based orchestrator that only required functions to be wrapped and executed.

Always in my mind during the development of the pipeline without Temporal that why it was limiting the payload size and history, because without Temporal, my current machine can easily handle it and run the process without any issues; while post-wrapped with Temporal has introduced the issue of the history (multiple times asking myself why I used this tool!)

However, thinking on Temporal architecture with retry, state checkpointing and durability features, I am insisted to grasp and understand it better, although the concept is kinda hard for me, especially the feature of Signal, Message and Query. At the moment, I just used Signal and not the later, although the implementation was not pushed to the production as of what I intended to develop is to schedule a series of workflow that is/are depend on others, communicated through Signal.

The problem is, using Signal required a static workflow id so that it can fetch the workflow id if the signal is send. (I called this - waiting mode to fire). Unfortunately, on the current Temporal version, if you schedule the workflow, it will create a workflow id appended with the timestamp, which makes the workflow id string dynamic, and hardly to hard code (inaccurate when I used .now()) ! The issue has been arisen in the the community forum and Github issue but still not patched and solved.

Nevertheless, let it be, as I found a workaround by

  1. Ensure all workflow is trigger with endpoint. (/etl_1 , /etl_2, etl_n…)

  2. Create an Activity that call the endpoint. (/schedule_all_endpoint)

  3. Create a workflow that call this endpoint - SchedulerWorkflow

  4. Create an endpoint with client to schedule this workflow.

  5. Trigger this endpoint. Boom, scheduled.

Asynchronous

You need to ensure everything is async, with the respect of whom. One workflow might async to another workflow, but dependent (previous need to be completed) to another. Every function written should support async function. For example, the api request, the db query, any process related to I/O cpu bound need to be async else it will be blocked and become sync.

And this is also to ensure we can implement Queue-Worker process, where you polling multiple workers that waiting for task to be queue and executed once queueing, that will speed up the process for every Workflows. Just ensure everything is async, and we good to go.