Airflow を BigQuery と繋げる

 
Apache Airflow 使い出したら便利だったのでメモ。

Airflow と BigQuery を繋げる方法を探してたら↓のブログ記事を見つけた。
How to aggregate data for BigQuery using Apache Airflow | Google Cloud Blog

しかし一部 LegacySQL 使ってたりして書き方が気に入らん部分があったので直したのがこちら。

from datetime import timedelta, datetime
import json
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

default_args = {
    'retries': 5,
    'retry_delay': timedelta(minutes=10),
    'depends_on_past': True,
    'gcp_conn_id': 'google_cloud_default',
    'use_legacy_sql': False
}

with DAG(
        dag_id='bq_hn_trends',
        default_args=default_args,
        start_date=datetime(2021, 6, 1),
        schedule_interval='0 0 * * *'
) as dag:

    bq_check_githubarchive_day = BigQueryCheckOperator(
        task_id='bq_check_githubarchive_day',
        sql="""
        SELECT
          FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,
        FROM
          `githubarchive.day.{{ yesterday_ds_nodash }}`
        WHERE
          type IN ('WatchEvent', 'ForkEvent')
        """
    )

    bq_check_hackernes_full = BigQueryCheckOperator(
        task_id='bq_check_hackernes_full',
        sql="""
        SELECT
          FORMAT_TIMESTAMP('%Y%m%d', timestamp) AS date
        FROM
          `bigquery-public-data.hacker_news.full`
        WHERE 1=1
          AND type = 'story'
          AND FORMAT_TIMESTAMP('%Y%m%d', timestamp) = '{{ yesterday_ds_nodash }}'
          LIMIT 1
        """
    )

    bq_write_to_github_daily_metrics = BigQueryOperator(
        task_id='bq_write_to_github_daily_metrics',
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        sql="""
        WITH base AS (
          SELECT
            FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,
            actor.id AS actor_id,
            repo.name AS repo,
            type
          FROM
            `githubarchive.day.{{ yesterday_ds_nodash }}`
          WHERE
            type IN ('WatchEvent', 'ForkEvent')
          )
        SELECT
          date,
          repo,
          SUM(CASE WHEN type = 'WatchEvent' THEN 1
            ELSE NULL END) AS stars,
          SUM(CASE WHEN type = 'ForkEvent' THEN 1
            ELSE NULL END) AS forks
        FROM
          base
        GROUP BY date, repo
        """,
        destination_dataset_table='my-project-id.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}'
    )

    bq_write_to_github_agg = BigQueryOperator(
        task_id='bq_write_to_github_agg',
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        sql="""
        select 
          '{{ yesterday_ds_nodash }}' as date,
          repo,
          sum(stars) as stars_last_28_days,
          sum(
              case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then stars 
              else null end) as stars_last_7_days,
          sum(
              case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then stars 
              else null end) as stars_last_1_day,
          sum(forks) as forks_last_28_days,
          sum(
              case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then forks 
              else null end) as forks_last_7_days,
          sum(
              case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then forks 
              else null end) as forks_last_1_day
        from 
            `my-project-id.github_trends.github_daily_metrics`
        where 
            _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -27) }}') and timestamp('{{ yesterday_ds }}') 
        group by date, repo
        """,
        destination_dataset_table='my-project-id.github_trends.github_agg${{ yesterday_ds_nodash }}'
    )

    bq_write_to_hackernews_agg = BigQueryOperator(
        task_id='bq_write_to_hackernews_agg',
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        sql="""
        select
          format_timestamp('%Y%m%d', timestamp) as date,
          `by` as submitter,
          id as story_id,
          regexp_extract(url, "https?://github.com/[^/]*/[^/#?]*") as url,
          sum(score) as score
        from 
          `bigquery-public-data.hacker_news.full`
        where
          type = 'story'
          and timestamp > '{{ yesterday_ds }}'
          and timestamp < '{{ ds }}'
          and url like '%https://github.com%'
          and url not like '%gihub.com/blog/%'
        group by 
          date, submitter, story_id, url
        """,
        destination_dataset_table='my-project-id.github_trends.hackernews_agg${{ yesterday_ds_nodash }}'
    )

    bq_write_to_hackernews_github_agg = BigQueryOperator(
        task_id='bq_write_to_hackernews_github_agg',
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        sql="""
        with base1 as (
            select * 
            from `my-project-id.github_trends.hackernews_agg`
            where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')
        ),
        base2 as (
            select 
              repo,
              concat('https://gitnub.com/', repo)as url,
              stars_last_28_days,
              stars_last_7_days,
              stars_last_1_day,
              forks_last_28_days,
              forks_last_7_days,
              forks_last_1_day,
            from `my-project-id.github_trends.github_agg`
            where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')
        )
        select 
          base1.date as date,
          base1.url as github_url,
          base2.repo as github_repo,
          base1.score as hn_score,
          base1.story_id as hn_story_id,
          base2.stars_last_28_days as stars_last_28_days,
          base2.stars_last_7_days as stars_last_7_days,
          base2.stars_last_1_day as stars_last_1_day,
          base2.forks_last_28_days as forks_last_28_days,
          base2.forks_last_7_days as forks_last_7_days,
          base2.forks_last_1_day as forks_last_1_day
        from base1 left join base2 on base1.url = base2.url
        """,
        destination_dataset_table='my-project-id.github_trends.hackernews_github_agg${{ yesterday_ds_nodash }}'
    )

    bq_check_hackernews_github_agg = BigQueryCheckOperator(
        task_id='bq_check_hackernews_github_agg',
        sql="""
        SELECT _PARTITIONTIME as pt
        FROM `my-project-id.github_trends.hackernews_github_agg`
        where format_timestamp('%Y%m%d', _PARTITIONTIME) = '{{ yesterday_ds_nodash }}'
        order by 1
        limit 1
        """
    )

    [bq_check_githubarchive_day >> bq_write_to_github_daily_metrics >> bq_write_to_github_agg,
     bq_check_hackernes_full >> bq_write_to_hackernews_agg] >> bq_write_to_hackernews_github_agg >> bq_check_hackernews_github_agg

 
backfill コマンドが Airflow 1.x の時と今使ってる Airflow 2.x とで構文がちょっと変わってた。
2.x だとこうなる。

$ airflow dags backfill -t bq_write_to_github_daily_metrics -s 2021-06-02 -e 2021-07-01 bq_hn_trends

Cloud Composer 使うと高いから、e2-small インスタンスで Airflow を自前で動かしてる。
e2-micro だとメモリ一杯になってフリーズしたw

Airflow のおかげで今まで一所懸命 bash + cron で運用してたのがアホらしくなってしまってもう戻れない。