Step by step tutorial
A typical workflow pipeline will:
Create a DAG file that calls code to construct the workflow in
my-dags/my_dags/dags
Create a workflow file containing code for the workflow itself in
my-dags/my_dags/workflows
Create one or multiple schema files for the workflow data loaded into BigQuery in
my-dags/my_dags/database/schema
Create a file with tests for the workflow in
my-dags/tests/workflows
Create a documentation file about the workflow in
my-dags/docs
and update theindex.rst
file
In these filepaths, my-dags
is the workflows project folder and my_dags
is the package name.
1. Creating a DAG file
For Airflow to pickup new DAGs, it is required to create a DAG file that contains the DAG object as well as the keywords
‘airflow’ and ‘DAG’.
Any code in this file is executed every time the file is loaded into the Airflow dagbag, which is once per minute by
default.
This means that the code in this file should be as minimal as possible, preferably limited to just creating the DAG
object.
The filename is usually similar to the DAG id and the file should be inside the my-dags/my_dags/dags
directory,
where my-dags
is the workflows project folder and my_dags
is the package name.
An example of the DAG file:
# The keywords airflow and DAG are required to load the DAGs from this file, see bullet 2 in the Apache Airflow FAQ:
# https://airflow.apache.org/docs/stable/faq.html
from my_dags.workflows.my_workflow import MyWorkflow
workflow = MyWorkflow()
globals()[workflow.dag_id] = workflow.make_dag()
2. Creating a workflow file
The workflow file contains the release class at the top, then the workflow class and at the bottom any functions that
are used within these classes.
This filename is also usually similar to the DAG id and should be inside the my-dags/my_dags/workflows
directory.
An example of the workflow file:
import pendulum
from observatory.platform.workflows.workflow import Release, Workflow
from observatory.platform.utils.airflow_utils import AirflowVars, AirflowConns
class MyWorkflowRelease(Release):
def __init__(self, dag_id: str, snapshot_date: pendulum.DateTime):
"""Construct a Release instance
:param dag_id: the id of the DAG.
:param snapshot_date: the release date (used to construct release_id).
"""
self.snapshot_date = snapshot_date
release_id = f"{dag_id}_{self.snapshot_date.strftime('%Y_%m_%d')}"
super().__init__(dag_id, release_id)
class MyWorkflow(Workflow):
""" MyWorkflow Workflow."""
DAG_ID = "my_workflow"
def __init__(
self,
dag_id: str = DAG_ID,
start_date: pendulum.DateTime = pendulum.datetime(2020, 1, 1),
schedule_interval: str = "@weekly",
catchup: bool = True,
queue: str = "default",
max_retries: int = 3,
max_active_runs: int = 1,
airflow_vars: list = None,
airflow_conns: list = None,
):
"""Construct a Workflow instance.
:param dag_id: the id of the DAG.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param catchup: whether to catchup the DAG or not.
:param queue: the Airflow queue name.
:param max_retries: the number of times to retry each task.
:param max_active_runs: the maximum number of DAG runs that can be run at once.
:param airflow_vars: list of airflow variable keys, for each variable it is checked if it exists in airflow
:param airflow_conns: list of airflow connection keys, for each connection it is checked if it exists in airflow
"""
if airflow_vars is None:
airflow_vars = [
AirflowVars.DATA_PATH,
AirflowVars.PROJECT_ID,
AirflowVars.DATA_LOCATION,
AirflowVars.DOWNLOAD_BUCKET,
AirflowVars.TRANSFORM_BUCKET,
]
# if airflow_conns is None:
# airflow_conns = [AirflowConns.SOMEDEFAULT_CONNECTION]
super().__init__(
dag_id,
start_date,
schedule_interval,
catchup=catchup,
queue=queue,
max_retries=max_retries,
max_active_runs=max_active_runs,
airflow_vars=airflow_vars,
airflow_conns=airflow_conns,
)
# Add sensor tasks
# self.add_operator(some_airflow_sensor)
# Add setup tasks
self.add_setup_task(self.check_dependencies)
# Add generic tasks
self.add_task(self.task1)
self.add_task(self.cleanup)
def make_release(self, **kwargs) -> MyWorkflowRelease:
"""Make a release instance.
:param kwargs: the context passed from the PythonOperator.
:return: A release instance
"""
snapshot_date = kwargs["execution_date"]
release = MyWorkflowRelease(dag_id=self.dag_id, snapshot_date=snapshot_date)
return release
def task1(self, release: MyWorkflowRelease, **kwargs):
"""Add your own comments.
:param release: A MyWorkflowRelease instance
:param kwargs: The context passed from the PythonOperator.
:return: None.
"""
pass
def cleanup(self, release: MyWorkflowRelease, **kwargs):
"""Delete downloaded, extracted and transformed files of the release.
:param release: A MyWorkflowRelease instance
:param kwargs: The context passed from the PythonOperator.
:return: None.
"""
release.cleanup()
Using airflow Xcoms
Xcoms are an Airflow concept and are used with the workflows to pass on information between tasks. The description of Xcoms by Airflow can be read here and is as follows:
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from. They can have any (serializable) value, but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes. XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push and xcom_pull methods on Task Instances. Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.
They are commonly used to pass on release information in workflows.
One task at the beginning of the workflow will retrieve release information such as the release date or possibly a
relevant release url.
The release information is then pushed during this task using Xcoms and it is pulled in the subsequent tasks, so a
release instance can be made with the given information.
An example of this can be seen in the implemented method get_release_info
of the StreamTelescope class.
The get_release_info
method:
def get_release_info(self, **kwargs) -> bool:
"""Push the release info (start date, end date, first release) using Xcoms.
:param kwargs: The context passed from the PythonOperator.
:return: None.
"""
ti: TaskInstance = kwargs["ti"]
first_release = False
release_info = ti.xcom_pull(key=self.RELEASE_INFO, include_prior_dates=True)
if not release_info:
first_release = True
# set start date to the start of the DAG
start_date = pendulum.instance(kwargs["dag"].default_args["start_date"]).start_of("day")
else:
# set start date to end date of previous DAG run, add 1 day, because end date was processed in prev run.
start_date = pendulum.parse(release_info[1]) + timedelta(days=1)
# set start date to current day, subtract 1 day, because data from same day might not be available yet.
end_date = pendulum.today("UTC") - timedelta(days=1)
logging.info(f"Start date: {start_date}, end date: {end_date}, first release: {first_release}")
# Turn dates into strings. Prefer JSON'able data over pickling in Airflow 2.
start_date = start_date.format("YYYYMMDD")
end_date = end_date.format("YYYYMMDD")
ti.xcom_push(self.RELEASE_INFO, (start_date, end_date, first_release))
return True
The start date, end date and first_release boolean are pushed using Xcoms with the RELEASE_INFO
property as a key.
The info is then used within the make_release
method.
See for example the make_release
method of the OrcidTelescope, which uses the StreamTelescope as a template.
def make_release(self, **kwargs) -> OrcidRelease:
"""Make a release instance. The release is passed as an argument to the function (TelescopeFunction) that is
called in 'task_callable'.
:param kwargs: the context passed from the PythonOperator. See
https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are
passed to this argument.
:return: an OrcidRelease instance.
"""
ti: TaskInstance = kwargs["ti"]
start_date, end_date, first_release = ti.xcom_pull(key=OrcidTelescope.RELEASE_INFO, include_prior_dates=True)
release = OrcidRelease(
self.dag_id, pendulum.parse(start_date), pendulum.parse(end_date), first_release, self.max_processes
)
return release
Using Airflow variables and connections
Any information that should not be hardcoded inside the workflow, but is still required for the workflow to function
can be passed on using Airflow variables and connections.
Both variables and connections can be added by defining them in the relevant config file (config.yaml
in local
develop environment and config-terraform.yaml
in deployed terraform environment).
Each variable or connection that is defined in the config file is made into an Airflow variable or connection when
starting the observatory environment.
The way these variables and connections are created is dependent on the type of observatory environment.
In the local develop environment, environment variables are created for Airflow variables and connections.
These environment variables are made up of the AIRLFOW_VAR_
or AIRFLOW_CONN_
prefix and the name that is used for
the variable or connection in the config file.
The prefixes are determined by Airflow and any environment variables with these prefixes will automatically be
picked up, see the Airflow documentation for more info on managing variables
and connections
with environment variables.
In the deployed terraform environment, the Google Cloud Secret Manager is used as a backend to store both Airflow
variables and connections, because this is more secure than using environment variables.
A secret is created for each individual Airflow variable or connection, see the Airflow documentation for more info
on the secrets backend.
Variables
Airflow variables should never contain any sensitive information. Example uses include the project_id, bucket names or data location.
Connections
Airflow connections can contain sensitive information and are often used to store credentials like API keys or usernames and passwords. In the local development environment, the Airflow connections are stored in the metastore database. There, the passwords inside the connection configurations are encrypted using Fernet. The value for the Airflow connection should always be a connection URI, see the Airflow documentation for more detailed information on how to construct this URI.
Using a new variable or connection
Any new Airflow variables or connections have to be added to either the AirflowVars or AirflowConns class in the
airflow_utils file.
This file can be found at:observatory-platform/observatory/platform/utils/airflow_utils.py
These two classes act as a registry that make it easy to access the variables and connections in different DAGs
For each class attribute, the attribute name is used inside the workflow and the value is used inside the
config.yaml
or config-terraform.yaml
file.
For example, to add the airflow variable ‘new_variable’ and connection ‘new_connection’, the relevant classes are updated like this:
# Inside observatory-platform/observatory/platform/utils/airflow_utils.py
class AirflowVars:
"""Common Airflow Variable names used with the Observatory Platform"""
# add to existing variables
NEW_VARIABLE = "new_variable"
class AirflowConns:
"""Common Airflow Connection names used with the Observatory Platform"""
# add to existing connections
NEW_CONNECTION = "new_connection"
The variable or connection can then be used inside the workflow like this:
# Inside my-dags/my_dags/workflows/my_workflow.py
from observatory.platform.utils.airflow_utils import AirflowVars, AirflowConns
airflow_conn = AirflowConns.NEW_CONNECTION
airflow_var = AirflowVars.NEW_VARIABLE
The relevant section of both the config.yaml
and config-terraform.yaml
files will look like this:
# User defined Apache Airflow variables:
airflow_variables:
new_variable: my-variable-value
# User defined Apache Airflow Connections:
airflow_connections:
new_connection: http://my-username:my-password@
3. Creating a BigQuery schema file
BigQuery database schema json files are stored in my-dag/my_dags/database/schema
.
They follow the format: <table_name>_YYYY-MM-DD.json
.
An additional custom version can be provided together with the date, in this case the files should follow the format:<table_name>_<customversion>_YYYY-MM-DD.json
.
The BigQuery table loading utility functions in the Observatory Platform will try to find the correct schema to use
for loading table data, based on the release date and custom version.
If no version is specified, the most recent schema with a date less than or equal to the release date of the data is
returned.
If a version string is specified, the most current (date) schema in that series is returned.
The utility functions are used by the BigQuery load tasks of the sub templates (Snapshot, Stream, Organisation) and
it is required to set the schema_version
parameter to automatically pick up the schema version when using these
templates.
4. Creating a test file
The Observatory Platform uses the unittest
Python framework as a base and provides additional methods to run tasks
and test DAG structure.
It also uses the Python coverage
package to analyse test coverage.
To ensure that the workflow works as expected and to pick up any changes in the code base that would break the workflow, it is required to add unit tests that cover the code in the developed workflow.
The test files for workflows are stored in my-dags/tests/workflows
.
The ObservatoryTestCase
class in the observatory-platform/observatory/platform/utils/test_utils.py
file contains
common test methods and should be used as a parent class for the unit tests instead of unittest.TestCase
.
Additionally, the ObservatoryEnvironment
class in the test_utils.py
can be used to simulate the Airflow
environment and the different workflow tasks can be run and tested inside this environment.
Testing DAG structure
The workflow’s DAG structure can be tested through the assert_dag_structure
method of ObservatoryTestCase
.
The DAG object is compared against a dictionary, where the key is the source node, and the value is a list of sink
nodes.
This expresses the relationship that the source node task is a dependency of all of the sink node tasks.
Example:
import pendulum
from observatory.platform.utils.test_utils import ObservatoryTestCase
from observatory.platform.workflows.workflow import Release, Workflow
class MyWorkflow(Workflow):
def __init__(
self,
dag_id: str = "my_workflow",
start_date: pendulum.DateTime = pendulum.datetime(2017, 3, 20),
schedule_interval: str = "@weekly",
):
super().__init__(dag_id, start_date, schedule_interval)
self.add_task(self.task1)
self.add_task(self.task2)
def make_release(self, **kwargs) -> Release:
snapshot_date = kwargs["execution_date"]
return Release(self.dag_id, snapshot_date)
def task1(self, release, **kwargs):
pass
def task2(self, release, **kwargs):
pass
class MyTestClass(ObservatoryTestCase):
"""Tests for the workflow"""
def __init__(self, *args, **kwargs):
"""Constructor which sets up variables used by tests.
:param args: arguments.
:param kwargs: keyword arguments.
"""
super(MyTestClass, self).__init__(*args, **kwargs)
def test_dag_structure(self):
"""Test that the DAG has the correct structure.
:return: None
"""
expected = {"task1": ["task2"], "task2": []}
workflow = MyWorkflow()
dag = workflow.make_dag()
self.assert_dag_structure(expected, dag)
Testing DAG loading
To test if a DAG loads from a DagBag, the assert_dag_load
method can be used within an ObservatoryEnvironment
.
Example:
import os
import pendulum
from observatory.platform.utils.config_utils import module_file_path
from observatory.platform.utils.test_utils import ObservatoryTestCase, ObservatoryEnvironment
from observatory.platform.workflows.workflow import Release, Workflow
class MyWorkflow(Workflow):
def __init__(
self,
dag_id: str = "my_workflow",
start_date: pendulum.DateTime = pendulum.datetime(2017, 3, 20),
schedule_interval: str = "@weekly",
):
super().__init__(dag_id, start_date, schedule_interval)
self.add_task(self.task1)
self.add_task(self.task2)
def make_release(self, **kwargs) -> Release:
snapshot_date = kwargs["execution_date"]
return Release(self.dag_id, snapshot_date)
def task1(self, release, **kwargs):
pass
def task2(self, release, **kwargs):
pass
class MyTestClass(ObservatoryTestCase):
"""Tests for the workflow"""
def __init__(self, *args, **kwargs):
"""Constructor which sets up variables used by tests.
:param args: arguments.
:param kwargs: keyword arguments.
"""
super(MyTestClass, self).__init__(*args, **kwargs)
def test_dag_load(self):
"""Test that the DAG can be loaded from a DAG bag.
:return: None
"""
with ObservatoryEnvironment().create():
dag_file = os.path.join(module_file_path("my_dags.dags"), "my_workflow.py")
self.assert_dag_load("my_workflow", dag_file)
Testing workflow tasks
To run and test a workflow task, the run_task
method can be used within an ObservatoryEnvironment
.
The ObservatoryEnvironment is used to simulate the Airflow environment.
To ensure that a workflow can be run from end to end the Observatory Environment creates additional resources, it will:
Create a temporary local directory.
Set the OBSERVATORY_HOME environment variable.
Initialise a temporary Airflow database.
Create download and transform Google Cloud Storage buckets.
Create BigQuery dataset(s).
Create default Airflow Variables:
AirflowVars.DATA_PATH
AirflowVars.PROJECT_ID
AirflowVars.DATA_LOCATION
AirflowVars.DOWNLOAD_BUCKET
AirflowVars.TRANSFORM_BUCKET.
Create an ObservatoryApiEnvironment.
Start an Elastic environment.
Clean up all resources when the environment is closed.
Note that if the unit test is stopped with a forced interrupt, the code block to clean up the created storage buckets and datasets will not be executed and those resources will have to be manually removed.
The run dependencies that are imposed on each task by the DAG structure are preserved in the test environment.
This means that to run a specific task, all the previous tasks in the DAG have to run successfully before that task
within the same create_dag_run
environment.
Example:
import pendulum
from observatory.platform.utils.test_utils import ObservatoryTestCase, ObservatoryEnvironment
from observatory.platform.workflows.workflow import Release, Workflow
class MyWorkflow(Workflow):
def __init__(
self,
dag_id: str = "my_workflow",
start_date: pendulum.DateTime = pendulum.datetime(2017, 3, 20),
schedule_interval: str = "@weekly",
):
super().__init__(dag_id, start_date, schedule_interval)
self.add_task(self.task1)
self.add_task(self.task2)
def make_release(self, **kwargs) -> Release:
snapshot_date = kwargs["execution_date"]
return Release(self.dag_id, snapshot_date)
def task1(self, release, **kwargs):
pass
def task2(self, release, **kwargs):
pass
class MyTestClass(ObservatoryTestCase):
"""Tests for the workflow"""
def __init__(self, *args, **kwargs):
"""Constructor which sets up variables used by tests.
:param args: arguments.
:param kwargs: keyword arguments.
"""
super(MyTestClass, self).__init__(*args, **kwargs)
self.execution_date = pendulum.datetime(2020, 1, 1)
def test_workflow(self):
"""Test the workflow end to end.
:return: None.
"""
# Setup Observatory environment
env = ObservatoryEnvironment()
# Setup Workflow
workflow = MyWorkflow()
dag = workflow.make_dag()
# Create the Observatory environment and run tests
with env.create():
with env.create_dag_run(dag, self.execution_date):
# Run task1
env.run_task(workflow.task1.__name__)
Temporary GCP datasets
Unit testing frameworks often run tests in parallel, so there is no guarantee of execution order.
When running code that modifies datasets or tables in the Google Cloud, it is recommended to create temporary
datasets for each task to prevent any bugs caused by race conditions.
The ObservatoryEnvironment
has a method called add_dataset
that can be used to create a new dataset in the linked
project for the duration of the environment.
Observatory Platform API
Some workflows make use of the Observatory Platform API in order to fetch necessary metadata. When writing unit tests for workflows that use the platform API, it is necessary to use an isolated API environment where the relevant WorkflowType, Organisations and Telescope exist. The ObservatoryEnvironment that is mentioned above can be used to achieve this. An API session is started when creating the ObservatoryEnvironment and the WorkflowType, Organisations and Telescope can all be added to this session.
Example:
import pendulum
from airflow.models.connection import Connection
from my_dags.utils.identifiers import WorkflowTypes
from observatory.api.server import orm
from observatory.platform.utils.airflow_utils import AirflowConns
from observatory.platform.utils.test_utils import ObservatoryEnvironment
dt = pendulum.now("UTC")
# Create observatory environment
env = ObservatoryEnvironment()
# Add the Observatory API connection, used from make_observatory_api() in DAG file
conn = Connection(conn_id=AirflowConns.OBSERVATORY_API, uri=f"http://:password@host:port")
env.add_connection(conn)
# Create telescope type with API
workflow_type = orm.WorkflowType(name="ONIX Telescope", type_id=WorkflowTypes.onix, created=dt, modified=dt)
env.api_session.add(workflow_type)
# Create organisation with API
organisation = orm.Organisation(name="Curtin Press", created=dt, modified=dt)
env.api_session.add(organisation)
# Create workflow with API
workflow = orm.Telescope(
name="Curtin Press ONIX Telescope",
workflow_type=workflow_type,
organisation=organisation,
modified=dt,
created=dt,
)
env.api_session.add(workflow)
# Commit changes
env.api_session.commit()
5. Creating a documentation file
The Observatory Platform builds documentation using Sphinx.
Documentation is contained in the docs
directory.
Currently index pages are written in RST format (Restructured Text),
and content pages are written with Markdown for simplicity.
It is possible to build the documentation by using the command:
cd docs
make html
This will output html documentation in the docs/_build/html
directory and the file docs_/build/index.html
can be
opened in a browser to preview what the documentation will look like.
A documentation file with info on the workflow should be added in the my-dags/docs
directory.
This documentation should at least include:
A short summary on the data source.
A summary table, see example below.
Any details on set-up steps that are required to run this workflow.
Info on any Airflow connections and variables that are used (see further below).
The latest schema.
Example of a summary table using eval_rst
to format the RST table:
```eval_rst
+------------------------------+---------+
| Summary | |
+==============================+=========+
| Average runtime | 10 min |
+------------------------------+---------+
| Average download size | 500 MB |
+------------------------------+---------+
| Harvest Type | API |
+------------------------------+---------+
| Harvest Frequency | Monthly |
+------------------------------+---------+
| Runs on remote worker | True |
+------------------------------+---------+
| Catchup missed runs | True |
+------------------------------+---------+
| Table Write Disposition | Truncate|
+------------------------------+---------+
| Update Frequency | Monthly |
+------------------------------+---------+
| Credentials Required | No |
+------------------------------+---------+
| Uses Workflow Template | Snapshot|
+------------------------------+---------+
| Each shard includes all data | Yes |
+------------------------------+---------+
```
Including Airflow variable/connection info in documentation
If a newly developed workflow uses an Airflow connection or variable, this should be explained in the workflow documentation. An example of the variable/connection is required as well as an explanation on how the value for this variable/connection can be obtained.
See for example this info section on the Airflow connection required with the google_books workflow:
Airflow connections
Note that all values need to be urlencoded. In the config.yaml file, the following airflow connection is required:
sftp_service
sftp_service: ssh://<username>:<password>@<host>?host_key=<host_key>
The sftp_service airflow connection is used to connect to the sftp_service and download the reports.
The username and password are created by the sftp service and the host is e.g. oaebu.exavault.com
.
The host key is optional, you can get it by running ssh-keyscan, e.g.:
ssh-keyscan oaebu.exavault.com
Including schemas in documentation
The documentation build system automatically converts all the schema files from my-dags/my_dags/database/schemas
into CSV files.
This is temporarily stored in the docs/schemas
folder.
The csv files have the same filename as the original schema files, except for the suffix, which is changed to csv.
If there are multiple schemas for the same workflow, the _latest
suffix can be used to always get the latest
version of the schema.
The schemas folder is cleaned up as part of the build process so this directory is not visible, but can be made
visable by disabling the cleanup code in the Makefile
.
To include a schema in the documentation markdown file, it is necessary to embed some RST that loads a table from a
csv file.
Since the recommonmark package is used, this can be done with an eval_rst
codeblock that contains RST:
``` eval_rst
.. csv-table::
:file: /path/to/schema_latest.csv
:width: 100%
:header-rows: 1
```
To determine the correct file path, it is recommended to construct a relative path to the docs/schemas
directory
from the directory of the markdown file.
For example, if the markdown file resides inmy-dags/docs/my_workflow.md
And the schema file path ismy-dags/my_dags/database/schema/my_workflow_2021-01-01.json
then the correct file path that should be used in the RST code block is
:file: ../schemas/my_workflow_latest.csv
The ..
follows the parent directory, this is needed once to reach docs
from my-dags/docs/workflows/my_workflow.md
.