Managed Airflow: Running a Basic Task on AWS, GCP, and Astronomer

Intro

The leading solution for orchestrating jobs in modern data engineering environments is Apache Airflow, an open-source orchestration platform spun out of Airbnb and adopted by Apache. Organizations have many options for deploying Airflow. Self-hosting generally involves using the freely available Helm charts to deploy Airflow on-prem or in a generic cloud Kubernetes service like EKS. Organizations seeking to avoid the complexity of maintaining an Airflow deployment will generally select from hosted offerings providing Airflow as a service.

The three leading hosted solutions are:

This article will provide little background on each of the services, instead focusing on the steps required to get a basic Airflow task running on each platform. Wherever possible, I will use infrastructure-as-code practices for each platform, rather than using the online consoles. If you are seeking an article focused on the platform trade-offs, I suggest reading Paul Fry’s article on managed Airflow solutions.

Task Definition

First, let’s define a local Airflow task and ensure it behaves as expected. The demo task will hit the NBA API to gather NBA games for the most recent season, select the game with the greatest total score, and send that game’s ID as a Slack message to a configured Slack connection.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from nba_api.stats.endpoints import leaguegamefinder

import os

# Mac local fix.
os.environ["no_proxy"] = "*"


@dag(
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1),
    catchup=False,
)
def game_alerter():
    @task()
    def find_max_pts_game_id():
        games_df = leaguegamefinder.LeagueGameFinder(
            player_or_team_abbreviation="T",
            season_nullable="2022-23",
            league_id_nullable="00",
        ).league_game_finder_results.get_data_frame()
        max_game_id = games_df.loc[games_df["PTS"].idxmax(), "GAME_ID"]
        return max_game_id

    max_game_id = find_max_pts_game_id()
    slack = SlackAPIPostOperator(
        task_id="send_max_game_id_to_slack",
        channel="#general",
        text=max_game_id,
        slack_conn_id="slack_api_default",
    )


game_alerter()

Try installing Airflow and setting up the Slack API to verify the sample task works. In this article, we’ll assume this file lives in dags/example.py.

Google Cloud Composer (GCC)

Google Cloud Composer (GCC) is GCP’s hosted Airflow offering, introduced in July 2018. The entire configuration for GCC can be completed using Terraform for GCP. First, let’s define some Terraform variables for storing project and connection information. Note that this is not best practice for the Airflow connection string - in production, a secrets backend should be used for storing our Slack API authentication info.

variable "project" {
  type = string
  default = "<your_project>"
}

variable "project_num" {
  type = string
  default = "<your_project_num>"
}

variable "airflow_slack_conn" {
  type = string
  default = "<your_airflow_slack_conn_string>"
}

Now, set up the GCC environment. Note that we will use the pypi_packages configuration to install the NBA API library and the Airflow Slack provider module, then use the env_variables config to notify Airflow of the Slack connection.

provider "google" {
  project = var.project
  region  = "us-central1"
}

resource "google_project_service" "composer_api" {
  provider = google
  project  = var.project
  service  = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  disable_on_destroy = false
}

resource "google_project_iam_member" "service_agent_role" {
  provider = google
  project  = var.project
  member   = "serviceAccount:service-${var.project_num}@cloudcomposer-accounts.iam.gserviceaccount.com"
  role     = "roles/composer.ServiceAgentV2Ext"
}

resource "google_composer_environment" "example_environment" {
  provider = google
  name = "test-environment"
  config {
    software_config {
      image_version = "composer-2.4.4-airflow-2.5.3"
			pypi_packages = {
					nba_api = ""
					apache-airflow-providers-slack = ""
			}
			env_variables = {
					AIRFLOW_CONN_SLACK_API_DEFAULT = var.airflow_slack_conn
			}
    }

  }
}

Finally, we upload our DAG to the GCP bucket created for the composer environment, indicating to GCC that it should register this task.

# Dynamically upload our test DAG to the Cloud Storage bucket.
resource "google_storage_bucket_object" "dag" {
 name         = "dags/example.py"
 source       = "dags/example.py"
 content_type = "text/x-python"
 bucket       = element(split("/", google_composer_environment.example_environment.config[0].dag_gcs_prefix), 2)
}

After running this Terraform plan (takes ~25 minutes), we can trigger our DAG using the Airflow UI via the GCP dashboard, or by using the Airflow CLI via the gcloud utility.

Don’t forget to clean up your resources. Note that not all resources are accounted for in the Terraform code provided. When an environment is created, GCP also creates a storage bucket for that environment’s information, and allocates disk space in the compute engine for the Redis queue supporting the Celery executors used by GCP. If we want to shut down everything in one go, we can take advantage of the Terraform export functionality provided by GCP. Run the following command to get importable Terraform definitions printed to stdout:

gcloud beta resource-config bulk-export --resource-types=StorageBucket,ComputeDisk --project=... --resource-format terraform

Then, import each of these using the terraform import command. Now, we can deallocate all resources at once with a terraform destroy invocation.

Amazon Managed Workflows (MWAA)

Amazon introduced its hosted Airflow offering, Managed Workflows for Apache Airflow (MWAA) in November 2020. We’ll begin our MWAA setup by using the Amazon-provided CloudFormation template for MWAA. This will construct the basic cloud resources supporting MWAA including a VPC, a MWAA environment, an execution role, and a S3 bucket. After creating the stack, upload our DAG file and a requirements.txt dependency file to the newly created S3 bucket. Then, we’ll add configurations informing MWAA how to find the requirements file, and configure the secrets backend, which is required for instantiating our Slack connection:

    	RequirementsS3ObjectVersion: <requirements_object_version>
      RequirementsS3Path: requirements.txt 
      AirflowConfigurationOptions:
        secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
        secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

Now, add a new secret containing our Airflow-formatted Slack connection string:

SlackConnectionSecret:
    Type: AWS::SecretsManager::Secret
    Properties: 
      Name: airflow/connections/slack_api_default 
      SecretString: <your_airflow_slack_conn_string>

Next, ensure the execution role has access to SecretsManager by adding the SecretsManagerReadWrite managed policy to the role:

ManagedPolicyArns:
        - arn:aws:iam::aws:policy/SecretsManagerReadWrite

Lastly, it is unfortunately the case that all AWS IP addresses are blocked by the NBA API. We can circumvent this by using a $3 proxy from IPRoyal, and providing the credentials to the LeagueGameFinder call in our Airflow task. After all of these changes, we are now able trigger and run the Airflow job via the Airflow UI.

Astronomer (Astro)

Astronomer is a company focusing exclusively on products in the orchestration space, with Airflow as their underlying technology. They announced their hosted Airflow offering in 2018, and have gone on to raise ~$300M supporting their efforts. Creating an Astronomer deployment involves the Astro CLI and a deployment file. We can create a basic deployment using the following configuration:

deployment:
  configuration:
    name: test
    description: ""
    runtime_version: 9.1.0
    dag_deploy_enabled: false
    ci_cd_enforcement: false
    scheduler_size: small
    is_high_availability: false
    executor: CeleryExecutor
    scheduler_au: 10
    scheduler_count: 1
    cluster_name: us-central1
    workspace_name: Phil's Workspace
    deployment_type: HOSTED_SHARED
    cloud_provider: gcp
    region: us-central1

Simply run astro deployment create --deployment-file astr.yaml to construct the deployment. This will instantiate the deployment on Astro’s side, enabling us to release new DAGs to the deployment.

The deployment process for Astro involves building an Astronomer Runtime image locally and uploading that image to an Astronomer-hosted Docker registry. This build is driven by a project folder, which can be created using the astro dev init command. Once this folder is created, we can:

  1. Populate the requirements.txt file with our dependencies.
  2. Add the AIRFLOW_CONN_SLACK_API_DEFAULT environment variable to the Dockerfile with our Slack connection string.
  3. Move our sample task into the dags/ folder.

Once this is complete, we can build the image and deploy it using the astro deploy command. Note - you will need Docker setup for this to succeed. On my M1 Mac locally, I used Colima as the Docker runtime. After the deploy is completed, we are able to trigger the task via the Airflow UI or the CLI support provided by Astronomer.