Most organizations eventually encounter the need to automate and schedule repetative tasks. A common approach to this is using shell scripts and cron jobs. This article is a quick introduction to an alternative way of doing this: By writing python code and using Cloud Composer.
5 min read
·
By Espen Meidell
·
December 9, 2020
Cloud Composer is a managed Apache Airflow service running on Google Cloud. Apache Airflow was created by Airbnb to manage and monitor their workflows . It has since been added as an Apache Foundation project. Many traditional workflow orchestration systems rely on XML configuration files. Airflow, on the other hand, lets a user write workflows using normal Python code. This simplifies integrations with other services (you can use normal Python libraries). I also makes it easier for developers to express logic in the workflows.
Using a managed Airflow service eliminates the need for maintenance and operations. We also leverage the benefits of integration with GCP services such as authentication, monitoring, and logging.
Some use cases where we have successfully used Cloud Composer include:
Workflows are defined as directed acyclic graphs (DAGs) witten in Python. The DAG file defines tasks and dependencies between tasks. Tasks can be anything, and are implemented by operators. Here are some examples of operators:
It is also very simple to define your own operators if you have custom requirements. To do this, you simply extend the BaseOperator class and write your own logic.
A special type of operators called sensors are used to monitor for changes. A sensor can for example poll a storage bucket and trigger a DAG if a new file appears.
The first step is visiting the Google Cloud Console and create a new environment. You should be fine using the default settings.
"Cloud Composer will create a Kubernetes cluster for your Airflow environment. If you want to avoid any costs, consider running Airflow locally."
Once the environment is created, you can click the Airflow link to visit the Airflow web server. Here you can see all DAGs, previous runs and trigger new runs manually. By default, Cloud Composer includes an airflow_monitoring
DAG to monitor that the environment is working. The web server is protected by Google IAP, so only authorised users can access it.
To add new DAGs click the DAGs link in the right side of the environments table. This opens a storage bucket. By adding a python script here the DAG will be discovered and run by Cloud Composer.
We will deploy this simple DAG to illustrate how Cloud Composer works. The DAG consists of three tasks:
Task two and three can run in parallel, but they both depend on task one. Airflow uses bit shift operators to describe dependencies between tasks.
import datetime
import airflow
from airflow.operators import bash_operator, python_operator
default_args = {
'owner': 'Example DAG',
'start_date': datetime.datetime(2020, 12, 7),
}
def python_hello_world(ds, **kwargs):
print("Hello from the python operator!")
with airflow.DAG(
'christmas_dag',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(minutes=5)) as dag:
bash_hello_world = bash_operator.BashOperator(
task_id='bash_hello_world', bash_command='echo "Hello World"'
)
python_task = python_operator.PythonOperator(
task_id='python_task',
provide_context=True,
python_callable=python_hello_world
)
ping_task = bash_operator.BashOperator(
task_id='ping_google', bash_command='ping -c5 google.com'
)
bash_hello_world >> [ping_task, python_task]
In the DAG constructor we specify the name of the DAG and how often we want it to run (every 5 minutes). After uploading the file to GCS it will appear in the Airflow UI. This may take a few minutes.
Clicking on the name of the DAG shows us more detailed information about the DAG. We can see a visual representation of the graph, historical runs, and even the code that defines the DAG. It is also possible to manually trigger a run from here.
After a short while the borders around the tasks should turn green. This indicates that the task executed successfully. Just above the graph there is a legend explaining the different colors codes.
If we click on a task we can see the parameters passed to the task and access the logs. Below we can see the logs from the task pinging google.com.
The task sent five pings to google, exactly what we expected it to do. The logs also include some information about the worker that ran the task.
There are a number of advanced features that are out of the scope of this article. Combining them makes it possible to create very powerful workflows. Some of them include:
Cloud Composer / Airflow is a very powerful tool, but it might not be the right tool for your use case. Here is a short list of some advantages and disadvantages.
Loading…
Loading…
Loading…