Apache Airflow 使い出したら便利だったのでメモ。
Airflow と BigQuery を繋げる方法を探してたら↓のブログ記事を見つけた。
How to aggregate data for BigQuery using Apache Airflow | Google Cloud Blog
しかし一部 LegacySQL 使ってたりして書き方が気に入らん部分があったので直したのがこちら。
from datetime import timedelta, datetime
import json
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
default_args = {
'retries': 5,
'retry_delay': timedelta(minutes=10),
'depends_on_past': True,
'gcp_conn_id': 'google_cloud_default',
'use_legacy_sql': False
}
with DAG(
dag_id='bq_hn_trends',
default_args=default_args,
start_date=datetime(2021, 6, 1),
schedule_interval='0 0 * * *'
) as dag:
bq_check_githubarchive_day = BigQueryCheckOperator(
task_id='bq_check_githubarchive_day',
sql="""
SELECT
FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,
FROM
`githubarchive.day.{{ yesterday_ds_nodash }}`
WHERE
type IN ('WatchEvent', 'ForkEvent')
"""
)
bq_check_hackernes_full = BigQueryCheckOperator(
task_id='bq_check_hackernes_full',
sql="""
SELECT
FORMAT_TIMESTAMP('%Y%m%d', timestamp) AS date
FROM
`bigquery-public-data.hacker_news.full`
WHERE 1=1
AND type = 'story'
AND FORMAT_TIMESTAMP('%Y%m%d', timestamp) = '{{ yesterday_ds_nodash }}'
LIMIT 1
"""
)
bq_write_to_github_daily_metrics = BigQueryOperator(
task_id='bq_write_to_github_daily_metrics',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql="""
WITH base AS (
SELECT
FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,
actor.id AS actor_id,
repo.name AS repo,
type
FROM
`githubarchive.day.{{ yesterday_ds_nodash }}`
WHERE
type IN ('WatchEvent', 'ForkEvent')
)
SELECT
date,
repo,
SUM(CASE WHEN type = 'WatchEvent' THEN 1
ELSE NULL END) AS stars,
SUM(CASE WHEN type = 'ForkEvent' THEN 1
ELSE NULL END) AS forks
FROM
base
GROUP BY date, repo
""",
destination_dataset_table='my-project-id.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}'
)
bq_write_to_github_agg = BigQueryOperator(
task_id='bq_write_to_github_agg',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql="""
select
'{{ yesterday_ds_nodash }}' as date,
repo,
sum(stars) as stars_last_28_days,
sum(
case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then stars
else null end) as stars_last_7_days,
sum(
case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then stars
else null end) as stars_last_1_day,
sum(forks) as forks_last_28_days,
sum(
case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then forks
else null end) as forks_last_7_days,
sum(
case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then forks
else null end) as forks_last_1_day
from
`my-project-id.github_trends.github_daily_metrics`
where
_PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -27) }}') and timestamp('{{ yesterday_ds }}')
group by date, repo
""",
destination_dataset_table='my-project-id.github_trends.github_agg${{ yesterday_ds_nodash }}'
)
bq_write_to_hackernews_agg = BigQueryOperator(
task_id='bq_write_to_hackernews_agg',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql="""
select
format_timestamp('%Y%m%d', timestamp) as date,
`by` as submitter,
id as story_id,
regexp_extract(url, "https?://github.com/[^/]*/[^/#?]*") as url,
sum(score) as score
from
`bigquery-public-data.hacker_news.full`
where
type = 'story'
and timestamp > '{{ yesterday_ds }}'
and timestamp < '{{ ds }}'
and url like '%https://github.com%'
and url not like '%gihub.com/blog/%'
group by
date, submitter, story_id, url
""",
destination_dataset_table='my-project-id.github_trends.hackernews_agg${{ yesterday_ds_nodash }}'
)
bq_write_to_hackernews_github_agg = BigQueryOperator(
task_id='bq_write_to_hackernews_github_agg',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql="""
with base1 as (
select *
from `my-project-id.github_trends.hackernews_agg`
where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')
),
base2 as (
select
repo,
concat('https://gitnub.com/', repo)as url,
stars_last_28_days,
stars_last_7_days,
stars_last_1_day,
forks_last_28_days,
forks_last_7_days,
forks_last_1_day,
from `my-project-id.github_trends.github_agg`
where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')
)
select
base1.date as date,
base1.url as github_url,
base2.repo as github_repo,
base1.score as hn_score,
base1.story_id as hn_story_id,
base2.stars_last_28_days as stars_last_28_days,
base2.stars_last_7_days as stars_last_7_days,
base2.stars_last_1_day as stars_last_1_day,
base2.forks_last_28_days as forks_last_28_days,
base2.forks_last_7_days as forks_last_7_days,
base2.forks_last_1_day as forks_last_1_day
from base1 left join base2 on base1.url = base2.url
""",
destination_dataset_table='my-project-id.github_trends.hackernews_github_agg${{ yesterday_ds_nodash }}'
)
bq_check_hackernews_github_agg = BigQueryCheckOperator(
task_id='bq_check_hackernews_github_agg',
sql="""
SELECT _PARTITIONTIME as pt
FROM `my-project-id.github_trends.hackernews_github_agg`
where format_timestamp('%Y%m%d', _PARTITIONTIME) = '{{ yesterday_ds_nodash }}'
order by 1
limit 1
"""
)
[bq_check_githubarchive_day >> bq_write_to_github_daily_metrics >> bq_write_to_github_agg,
bq_check_hackernes_full >> bq_write_to_hackernews_agg] >> bq_write_to_hackernews_github_agg >> bq_check_hackernews_github_agg
backfill コマンドが Airflow 1.x の時と今使ってる Airflow 2.x とで構文がちょっと変わってた。
2.x だとこうなる。
$ airflow dags backfill -t bq_write_to_github_daily_metrics -s 2021-06-02 -e 2021-07-01 bq_hn_trends
Cloud Composer 使うと高いから、e2-small インスタンスで Airflow を自前で動かしてる。
e2-micro だとメモリ一杯になってフリーズしたw
Airflow のおかげで今まで一所懸命 bash + cron で運用してたのがアホらしくなってしまってもう戻れない。