Automate Your Data Warehouse with Airflow on Google Cloud Platform

Ilham Maulana Putra
SelectFrom
Published in
6 min readJan 22, 2022

--

Photo by Clément Hélardot on Unsplash

After six months of learning how to be a data engineer, I found a tool that I found most useful. Apache Airflow is an open-source workflow management platform for data engineering pipelines. It started at Airbnb in October 2014 as a solution to manage the company’s increasingly complex workflows (Wikipedia). I started learning Airflow two months ago and I think the best way to start learning something is by doing it.

This article details the project that I build to learn Airflow.

In this project, we will build a data warehouse on Google Cloud Platform with Airflow. We will experience first-hand how to build a DAG to achieve a common data engineering task: extract data from sources such as Postgres and MySQL, load it to a data sink or data lake (Google Cloud Storage), transform it, and load it into a data warehouse (Big Query).

Set up Airflow locally or with Docker.

You can check out posts on how to set up Airflow locally or with docker.

The dataset: Indonesian Food Recipes | Kaggle

This Kaggle dataset contains eight datasets of Indonesian Food recipes. But, we will only use six of them.

2 Datasets in Cloud SQL (MYSQL) GCP
2 Datasets in PostgreSQL
2 Datasets in My Local PC

Architecture

The Project Architecture

Data source 1 : Cloud SQL GCP (MySQL)

Data source 2 : PostgreSQL

Data source 3 : local file

  1. Extracting data: First, we need to ingest our recipe data from the databases to Google Cloud Storage with Airflow. This task requires a setup connection between Airflow and the databases. You can check out blog posts online that will guide you on how to set up the connection for Google Cloud Connection, MySQL, and PostgreSQL.
  2. Transforming data: In this task, we will transform our dataset using BigQuery in GCPTransforming.
  3. Loading data: After transforming the data, we can use the data for analytics using Big Query and Google Data Studio.

Data Pipelines

Data Pipelines Orchestration with Airflow

The data pipeline above is built by creating the DAG Airflow file. Check my GitHub Repo for this project.

A DummyOperator called start_pipeline will kick off after we trigger our DAG followed by six loads of ingesting data operation from the data sources to Google Cloud Storage and the files loaded as CSV. The TransferOperator will transfer the datasets from Google Cloud Storage to Big Query and there is also BigQueryCheckOperator to check the rows after loading it to Big Query.

Next, the pipelines will create a Dimension table from each of the recipe datasets with added column Main_Ingredients. Then the last operator creates a Fact table by merging all the dimension tables.

THE DAG FILE

Import the Airflow Libraries and Create Constant

The first thing we need to do when creating a DAG File is import the required Airflow and Python libraries. For Airflow libraries, you can check the Astronomer Website.

Define the Python Hook Function

After importing the libraries, we need to define the Python hook function for our PythonOperator. The Python hook function is a function for transferring our datasets from databases like Cloud SQL and PostgreSQL to Google Cloud Storage. This function will execute using our PythonOperator.

Define the DAG

For the next step, we need to define our DAG. We should give it the default arguments such as start_date, retries, retry_delay, etc. We can also define the required DAG variables to pass in variables instead of defining them in the DAG definition code.

Define Ingest Data Tasks from Multiple Data Sources to GCS

As you can see, we will be using FiletoGoogleCloudStorageOperator to transfer the datasets from local file data sources to Google Cloud Storage. We are also using the Python operator in this task that will execute our Python hook function we defined earlier.

Define Load Tasks From GCS to Big Query

In this example, we load the datasets to the BigQuery staging table. All six datasets will create six staging tables. Please notice that for this example we are using the CSV format. We should provide a schema field to ensure that data is loaded in the expected format.

Define Data Check Tasks Big Query

In this task, we just have to make sure our previous tasks ran as expected. In our data check tasks, we only need to check the row count of our tables to verify if any row exited.

Create Fact and Dim Tables

After the staging datasets are loaded and checked. We are going to use the BigQueryOperator to create Fact and Dim Tables by executing SQL queries to transform and create fact and dim tables.

Define Tasks Dependencies

For the final part, we need to define task dependencies by using the >> or << operator.

The Data Lake and Data Warehouse

After we run the DAG and the DAG status is a success, we can check our data lake in Google Cloud Storage and the data warehouse in Big Query Google Cloud Platform.

The data lake in Google Cloud Storage is useful to store our raw datasets as a backup.

Data Lake in GCS

Look at this staging dataset. In the staging dataset, our data is still not being transformed. That is why we need an SQL query with BigQueryOperator to transform and create a fact and dim table.

The Staging Dataset

After we succeed in running our DAG, this is the final outcome of our data warehouse. Look at the schema differences between dimension and fact tables.

Dim Table Schema
Fact Table Schema

Now, let’s try to find out which recipe has the most likes from the user. We can do this by executing an SQL query in Big Query.

Google Data Studio

We can also build a dashboard containing important data from our table in Big Query using Google Data Studio.

Summary

In this article, I showed you how to automate data warehousing on Google Cloud Platform with Airflow. I know I can’t cover everything in this article because I’m still learning how to be a data engineer. Any critical feedback is welcome. I hope this article can be a source of encouragement for you and I to keep learning how to be a data engineer.

The world’s fastest cloud data warehouse:

When designing analytics experiences which are consumed by customers in production, even the smallest delays in query response times become critical. Learn how to achieve sub-second performance over TBs of data with Firebolt.

--

--