Implementation and Dynamic Generation for Tasks in Apache Airflow

by | Nov 23, 2022 | Detection, Research

I recently worked on a project focused on log anomaly detection using manageable machine learning pipelines. The pipelines mainly include data collection — feature extraction — feature engineering — detection/prediction — updating (maintenance)

It’s important to have a solid UI to manage the pipelines so I can easily review the chain of pipelines. After much research, I found many engineers recommended Airflow

In airflow, the core concept is the Directed Acyclic Graph (DAG). Through the implementation, I have confirmed that this is a truly powerful tool to manage the machine learning pipelines, instead of relying on shell scripts. But, I did encounter some challenges during the process and also, fortunately, found solutions for them. 

The challenges can be split into two main aspects, pipeline management and dynamic generation for tasks

  • Pipeline management – During the process of solving the problem about pipeline management, I met the following problems when implementing the machine learning pipelines in Airflow:
  • How to solve the dependencies within one DAG
  • How to solve the dependencies between Dags
  • How to overcome known issues with ExternalTaskSensor:
  • How to overcome issues with the execution time 
  • Dynamic generation for tasks – When I tried to integrate the pipelines with our own ELK stack, I found the problem about how to dynamically generate the tasks in a dag. This problem comes from the different log types, which include Linux, Windows, VPN and so on. I also found the same type of logs from different clients require different treatment too. The generation of tasks should be scalable and automatic

In the first place, I had many choices to make. For the operator, I can choose from the PythonOperator, BaseOperator, or BashOperator. For the dependencies, I can choose TriggerDagRunOperator, Xcom, or SubDag.

After some testing, I found the most effective solution is usually the simplest, even when not 100% perfect. I chose the following combination:

BaseOperator + DummyOperator + Plugins + Xcom + For loop + ExternalTaskSensor

1. DummyOperator Usage

DummyOperator can be used to group tasks in a DAG. In order to structure different tasks into one nice workflow, I used the DummyOperator to connect them. They won’t be executed by the executor. After introducing those two tasks, there is a common start task and a common end task to connect all middle parallel tasks.

start_task = DummyOperator(
task_id=
‘start_task’,
dag=dag
)
end_task = DummyOperator(
task_id =
‘end_task’,
dag = dag
)

PNG1:Airflow graph view

For the dynamic generation of tasks, I want to introduce a kind of structure to organise the code. Most of the logs share the same processing logic, so I need to introduce several automatic variables inside the tasks. The basic structure would look like the following:

”’
  def Dynamic_Function(variable):
      task_1 = Function1(
      task_id = ‘task_{}’.format(variable),
      dag = dag,
      …
      )
      return task_1
”’

for variable in variables:
  task_1 = Dynamic_Function(variable)

The variables can be read it from the environment variables or just set it as a list:

# the python way to read environment values from .env file:

os.getenv(‘variables’).split()

This method is not that complex, but it is quite useful when there are multiple tasks sharing the same processing logic and there is only one difference of variable in them, allowing the project to be easily scaled.  

2. Plugin Operator and BaseOperator

For the Function1, it is defined in a customised way in plugins/operators. You can find the detailed information on this link. The main context is shown below:

from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class MyFirstOperator(BaseOperator):    
  @apply_defaults   
  def __init__(self, my_operator_param, *args, **kwargs):           
      self.operator_param = my_operator_param
      super(MyFirstOperator, self).__init__(*args, **kwargs)    

  def execute(self, context):       
        …
class MyFirstPlugin(AirflowPlugin):
  name = “my_first_plugin”   
  operators = [MyFirstOperator]

I use it for the reason that I do not need to put all my code in the DAG. Otherwise, the DAG code would be extremely redundant and hard to manage.

I use BaseOperator instead of PythonOperator because of the simplicity. The PythonOperator is more complex to control and needs to set more unnecessary parameters.

With the above two solutions, the dynamic tasks can be easily built in one DAG now. The following solutions are more for the connection and concurrency problems I met during a project. 

3. Xcom & ExternalTaskSensor

Now, I have to solve three key problems:

  • How to save the result for the next task? 
  • How to get the result from the last task?
  • How to make sure the result is within the right time interval?

Airflow provides powerful solutions for those problems with Xcom and ExternalTaskSensor.

To save the result from the current task, Xcom is used for this requirement. It is a bit similar to git. To use it, xcom_push and xcom_pull are the main functions needed. But there is a limitation for the size, which is 48KB. Normally, you do not need to worry about the size, but it is advisable to try to save the middle variable value in xcom while not using big files. 

If you want to extract the result obtained from the previous DAG with a specified task combing with the dynamic tasks, the extraction process is independent and you should use the ExternalTaskSensor with the following setting:

for variable in variables:

  # create the task to depend on the up_stream dag
  external_sensor = ExternalTaskSensor(

  task_id=‘ext_sensor_task’,
  external_dag_id=’xxx’,
  external_task_id=‘xxx_{}’.format(variable),
  timeout = 300,
  dag=dag,
  )

I have to mention here, you should not use end_task in the previous DAG. If you do not want all tasks to be finished on the previous day, then go through the next day.

4. Execution Time

Execution time is kind of limited in Airflow in version 1.x. I have not tested the 2.x. In version 1.x, it does not help to change the timezone in airflow.cfg.

But you can use the specified way to solve the problem. The pendulum library is a really great option.

import pendulum
# get the format date string
current_date = pendulum.datetime.now().strftime(
“%Y, %m, %d, %H“)
dag = DAG(
  dag_id = dag_id,
  # get the datetime type value
  start_date = pendulum.strptime(current_date, “
%Y, %m, %d, %H”).astimezone(‘Europe/London’).subtract(hours=1),
  default_args = default_args,
  schedule_interval = timedelta(hours=
1),
)

With this setting, you can introduce a trial task before the current time and you can make sure the time is the same as your local timezone. 

5. ExternalTaskSensor Stuck Problem

When people design dependent tasks in different dags, the ExternalTaskSensor is a common function to use. But if you do not follow some best practices, it can quite easily get stuck. The main problem relates to the time settings for DAGs. Among the errors that can occur, the most common is where the previous task generates a large middle value and it is impossible to transfer to an external task because of the size limitation for middle value storage. 

So, how to best set the time for DAGs? Based on an answer from stackoverflow: the DAGs don’t need to have the same start_date. If you create your ExternalTaskSensor task without the execution_delta or execution_date_fn, then the two DAGs need to have the same execution date. It so happens that if two DAGs have the same schedule, a scheduled task running in each interval will have the same execution date.

The optimal choice is to exclude execution_delta and execution_data_fn if you encounter challenges when computing the time. You should never manually trigger (in the Links column) the DAG in WebUI if the result will be sent to the next DAG. It will generate different execution dates. In practice, I defined the same start_date by setting a specific date. When I start the DAGs in Web UI, I will press all the DAG buttons at the same time if those DAGs are dependent on one other.

DAGs View PNG2

PNG2:DAGs View

This is a very brief description of my solutions for the tricky problems I encountered. Thanks for reading!

References:

How do I trigger Airflow -dag using TriggerDagRunOperatorI have found following link: https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand…stackoverflow.com

Airflow ExternalTaskSensor gets stuckI’m trying to use ExternalTaskSensor and it gets stuck at poking another DAG’s task, which has already been…stackoverflow.com

Sensing the completion of external airflow tasks(Not the best title)medium.com

Creating a dynamic DAG using Apache AirflowToday we want to share with you one problem we solved by using Apache Airflow. We have a project comprising more than…towardsdatascience.com

Disclaimer

The information provided on this website is to be used for educational purposes only. The author is in no way responsible for any misuse of the information provided. Any actions and or activities related to the material contained within this website is solely your responsibility.

GitHub Activity

 

Follow JUMPSECLabs

Disclaimer

The information provided on this website is to be used for educational purposes only. The author is in no way responsible for any misuse of the information provided. Any actions and or activities related to the material contained within this website is solely your responsibility.

You may also like…

PowerShell Jobs

JUMPSEC investigators recently observed an adversary weaponising PowerShell Jobs to schedule their attack whilst responding to an incident. We discuss what PowerShell Jobs are, how they can be leveraged for malicious purposes, and how defenders can protect, detect, and respond to neutralise the threat.

Share This