I recently worked on a project focused on log anomaly detection using manageable machine learning pipelines. The pipelines mainly include data collection — feature extraction — feature engineering — detection/prediction — updating (maintenance).
It’s important to have a solid UI to manage the pipelines so I can easily review the chain of pipelines. After much research, I found many engineers recommended Airflow.
In airflow, the core concept is the Directed Acyclic Graph (DAG). Through the implementation, I have confirmed that this is a truly powerful tool to manage the machine learning pipelines, instead of relying on shell scripts. But, I did encounter some challenges during the process and also, fortunately, found solutions for them.
The challenges can be split into two main aspects, pipeline management and dynamic generation for tasks.
- Pipeline management – During the process of solving the problem about pipeline management, I met the following problems when implementing the machine learning pipelines in Airflow:
- How to solve the dependencies within one DAG
- How to solve the dependencies between Dags
- How to overcome known issues with ExternalTaskSensor:
- How to overcome issues with the execution time
- Dynamic generation for tasks – When I tried to integrate the pipelines with our own ELK stack, I found the problem about how to dynamically generate the tasks in a dag. This problem comes from the different log types, which include Linux, Windows, VPN and so on. I also found the same type of logs from different clients require different treatment too. The generation of tasks should be scalable and automatic.
In the first place, I had many choices to make. For the operator, I can choose from the PythonOperator, BaseOperator, or BashOperator. For the dependencies, I can choose TriggerDagRunOperator, Xcom, or SubDag.
After some testing, I found the most effective solution is usually the simplest, even when not 100% perfect. I chose the following combination:
BaseOperator + DummyOperator + Plugins + Xcom + For loop + ExternalTaskSensor
1. DummyOperator Usage
DummyOperator can be used to group tasks in a DAG. In order to structure different tasks into one nice workflow, I used the DummyOperator to connect them. They won’t be executed by the executor. After introducing those two tasks, there is a common start task and a common end task to connect all middle parallel tasks.
start_task = DummyOperator( task_id=‘start_task’, dag=dag ) end_task = DummyOperator( task_id = ‘end_task’, dag = dag ) |
PNG1:Airflow graph view
For the dynamic generation of tasks, I want to introduce a kind of structure to organise the code. Most of the logs share the same processing logic, so I need to introduce several automatic variables inside the tasks. The basic structure would look like the following:
”’ def Dynamic_Function(variable): task_1 = Function1( task_id = ‘task_{}’.format(variable), dag = dag, … ) return task_1 ”’ for variable in variables: task_1 = Dynamic_Function(variable) |
The variables can be read it from the environment variables or just set it as a list:
# the python way to read environment values from .env file:
os.getenv(‘variables’).split(”) |
This method is not that complex, but it is quite useful when there are multiple tasks sharing the same processing logic and there is only one difference of variable in them, allowing the project to be easily scaled.
2. Plugin Operator and BaseOperator
For the Function1, it is defined in a customised way in plugins/operators. You can find the detailed information on this link. The main context is shown below:
from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults class MyFirstOperator(BaseOperator): @apply_defaults def __init__(self, my_operator_param, *args, **kwargs): self.operator_param = my_operator_param super(MyFirstOperator, self).__init__(*args, **kwargs) def execute(self, context): … class MyFirstPlugin(AirflowPlugin): name = “my_first_plugin” operators = [MyFirstOperator] |
I use it for the reason that I do not need to put all my code in the DAG. Otherwise, the DAG code would be extremely redundant and hard to manage.
I use BaseOperator instead of PythonOperator because of the simplicity. The PythonOperator is more complex to control and needs to set more unnecessary parameters.
With the above two solutions, the dynamic tasks can be easily built in one DAG now. The following solutions are more for the connection and concurrency problems I met during a project.
3. Xcom & ExternalTaskSensor
Now, I have to solve three key problems:
- How to save the result for the next task?
- How to get the result from the last task?
- How to make sure the result is within the right time interval?
Airflow provides powerful solutions for those problems with Xcom and ExternalTaskSensor.
To save the result from the current task, Xcom is used for this requirement. It is a bit similar to git. To use it, xcom_push and xcom_pull are the main functions needed. But there is a limitation for the size, which is 48KB. Normally, you do not need to worry about the size, but it is advisable to try to save the middle variable value in xcom while not using big files.
If you want to extract the result obtained from the previous DAG with a specified task combing with the dynamic tasks, the extraction process is independent and you should use the ExternalTaskSensor with the following setting:
for variable in variables: … # create the task to depend on the up_stream dag external_sensor = ExternalTaskSensor( task_id=‘ext_sensor_task’, external_dag_id=’xxx’, external_task_id=‘xxx_{}’.format(variable), timeout = 300, dag=dag, ) … |
I have to mention here, you should not use end_task in the previous DAG. If you do not want all tasks to be finished on the previous day, then go through the next day.
4. Execution Time
Execution time is kind of limited in Airflow in version 1.x. I have not tested the 2.x. In version 1.x, it does not help to change the timezone in airflow.cfg.
But you can use the specified way to solve the problem. The pendulum library is a really great option.
import pendulum # get the format date string current_date = pendulum.datetime.now().strftime(“%Y, %m, %d, %H“) dag = DAG( dag_id = dag_id, # get the datetime type value start_date = pendulum.strptime(current_date, “%Y, %m, %d, %H”).astimezone(‘Europe/London’).subtract(hours=1), default_args = default_args, schedule_interval = timedelta(hours=1), ) |
With this setting, you can introduce a trial task before the current time and you can make sure the time is the same as your local timezone.
5. ExternalTaskSensor Stuck Problem
When people design dependent tasks in different dags, the ExternalTaskSensor is a common function to use. But if you do not follow some best practices, it can quite easily get stuck. The main problem relates to the time settings for DAGs. Among the errors that can occur, the most common is where the previous task generates a large middle value and it is impossible to transfer to an external task because of the size limitation for middle value storage.
So, how to best set the time for DAGs? Based on an answer from stackoverflow: the DAGs don’t need to have the same start_date. If you create your ExternalTaskSensor task without the execution_delta or execution_date_fn, then the two DAGs need to have the same execution date. It so happens that if two DAGs have the same schedule, a scheduled task running in each interval will have the same execution date.
The optimal choice is to exclude execution_delta and execution_data_fn if you encounter challenges when computing the time. You should never manually trigger (in the Links column) the DAG in WebUI if the result will be sent to the next DAG. It will generate different execution dates. In practice, I defined the same start_date by setting a specific date. When I start the DAGs in Web UI, I will press all the DAG buttons at the same time if those DAGs are dependent on one other.
PNG2:DAGs View
This is a very brief description of my solutions for the tricky problems I encountered. Thanks for reading!
References:
How do I trigger Airflow -dag using TriggerDagRunOperator – I have found following link: https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand…stackoverflow.com
Airflow ExternalTaskSensor gets stuck – I’m trying to use ExternalTaskSensor and it gets stuck at poking another DAG’s task, which has already been…stackoverflow.com
Sensing the completion of external airflow tasks – (Not the best title)medium.com
Creating a dynamic DAG using Apache Airflow – Today we want to share with you one problem we solved by using Apache Airflow. We have a project comprising more than…towardsdatascience.com