{"id":1540,"date":"2021-07-03T21:53:02","date_gmt":"2021-07-03T12:53:02","guid":{"rendered":"https:\/\/www.kwonline.org\/memo2\/?p=1540"},"modified":"2021-07-12T14:53:05","modified_gmt":"2021-07-12T05:53:05","slug":"connect-airflow-and-bigquery","status":"publish","type":"post","link":"https:\/\/www.kwonline.org\/memo2\/2021\/07\/03\/connect-airflow-and-bigquery\/","title":{"rendered":"Airflow \u3092 BigQuery \u3068\u7e4b\u3052\u308b"},"content":{"rendered":"<p>&nbsp;<br \/>\nApache Airflow \u4f7f\u3044\u51fa\u3057\u305f\u3089\u4fbf\u5229\u3060\u3063\u305f\u306e\u3067\u30e1\u30e2\u3002<\/p>\n<p>Airflow \u3068 BigQuery \u3092\u7e4b\u3052\u308b\u65b9\u6cd5\u3092\u63a2\u3057\u3066\u305f\u3089\u2193\u306e\u30d6\u30ed\u30b0\u8a18\u4e8b\u3092\u898b\u3064\u3051\u305f\u3002<br \/>\n<a href=\"https:\/\/cloud.google.com\/blog\/products\/bigquery\/how-to-aggregate-data-for-bigquery-using-apache-airflow\" rel=\"noopener\" target=\"_blank\">How to aggregate data for BigQuery using Apache Airflow | Google Cloud Blog<\/a><\/p>\n<p>\u3057\u304b\u3057\u4e00\u90e8 LegacySQL \u4f7f\u3063\u3066\u305f\u308a\u3057\u3066\u66f8\u304d\u65b9\u304c\u6c17\u306b\u5165\u3089\u3093\u90e8\u5206\u304c\u3042\u3063\u305f\u306e\u3067\u76f4\u3057\u305f\u306e\u304c\u3053\u3061\u3089\u3002<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom datetime import timedelta, datetime\r\nimport json\r\nfrom airflow import DAG\r\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\r\nfrom airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator\r\n\r\ndefault_args = {\r\n    'retries': 5,\r\n    'retry_delay': timedelta(minutes=10),\r\n    'depends_on_past': True,\r\n    'gcp_conn_id': 'google_cloud_default',\r\n    'use_legacy_sql': False\r\n}\r\n\r\nwith DAG(\r\n        dag_id='bq_hn_trends',\r\n        default_args=default_args,\r\n        start_date=datetime(2021, 6, 1),\r\n        schedule_interval='0 0 * * *'\r\n) as dag:\r\n\r\n    bq_check_githubarchive_day = BigQueryCheckOperator(\r\n        task_id='bq_check_githubarchive_day',\r\n        sql=&quot;&quot;&quot;\r\n        SELECT\r\n          FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,\r\n        FROM\r\n          `githubarchive.day.{{ yesterday_ds_nodash }}`\r\n        WHERE\r\n          type IN ('WatchEvent', 'ForkEvent')\r\n        &quot;&quot;&quot;\r\n    )\r\n\r\n    bq_check_hackernes_full = BigQueryCheckOperator(\r\n        task_id='bq_check_hackernes_full',\r\n        sql=&quot;&quot;&quot;\r\n        SELECT\r\n          FORMAT_TIMESTAMP('%Y%m%d', timestamp) AS date\r\n        FROM\r\n          `bigquery-public-data.hacker_news.full`\r\n        WHERE 1=1\r\n          AND type = 'story'\r\n          AND FORMAT_TIMESTAMP('%Y%m%d', timestamp) = '{{ yesterday_ds_nodash }}'\r\n          LIMIT 1\r\n        &quot;&quot;&quot;\r\n    )\r\n\r\n    bq_write_to_github_daily_metrics = BigQueryOperator(\r\n        task_id='bq_write_to_github_daily_metrics',\r\n        write_disposition='WRITE_TRUNCATE',\r\n        allow_large_results=True,\r\n        sql=&quot;&quot;&quot;\r\n        WITH base AS (\r\n          SELECT\r\n            FORMAT_TIMESTAMP('%Y%m%d', created_at) AS date,\r\n            actor.id AS actor_id,\r\n            repo.name AS repo,\r\n            type\r\n          FROM\r\n            `githubarchive.day.{{ yesterday_ds_nodash }}`\r\n          WHERE\r\n            type IN ('WatchEvent', 'ForkEvent')\r\n          )\r\n        SELECT\r\n          date,\r\n          repo,\r\n          SUM(CASE WHEN type = 'WatchEvent' THEN 1\r\n            ELSE NULL END) AS stars,\r\n          SUM(CASE WHEN type = 'ForkEvent' THEN 1\r\n            ELSE NULL END) AS forks\r\n        FROM\r\n          base\r\n        GROUP BY date, repo\r\n        &quot;&quot;&quot;,\r\n        destination_dataset_table='my-project-id.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}'\r\n    )\r\n\r\n    bq_write_to_github_agg = BigQueryOperator(\r\n        task_id='bq_write_to_github_agg',\r\n        write_disposition='WRITE_TRUNCATE',\r\n        allow_large_results=True,\r\n        sql=&quot;&quot;&quot;\r\n        select \r\n          '{{ yesterday_ds_nodash }}' as date,\r\n          repo,\r\n          sum(stars) as stars_last_28_days,\r\n          sum(\r\n              case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then stars \r\n              else null end) as stars_last_7_days,\r\n          sum(\r\n              case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then stars \r\n              else null end) as stars_last_1_day,\r\n          sum(forks) as forks_last_28_days,\r\n          sum(\r\n              case when _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -6) }}') and timestamp('{{ yesterday_ds }}') then forks \r\n              else null end) as forks_last_7_days,\r\n          sum(\r\n              case when _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}') then forks \r\n              else null end) as forks_last_1_day\r\n        from \r\n            `my-project-id.github_trends.github_daily_metrics`\r\n        where \r\n            _PARTITIONTIME between timestamp('{{ macros.ds_add(ds, -27) }}') and timestamp('{{ yesterday_ds }}') \r\n        group by date, repo\r\n        &quot;&quot;&quot;,\r\n        destination_dataset_table='my-project-id.github_trends.github_agg${{ yesterday_ds_nodash }}'\r\n    )\r\n\r\n    bq_write_to_hackernews_agg = BigQueryOperator(\r\n        task_id='bq_write_to_hackernews_agg',\r\n        write_disposition='WRITE_TRUNCATE',\r\n        allow_large_results=True,\r\n        sql=&quot;&quot;&quot;\r\n        select\r\n          format_timestamp('%Y%m%d', timestamp) as date,\r\n          `by` as submitter,\r\n          id as story_id,\r\n          regexp_extract(url, &quot;https?:\/\/github.com\/&#x5B;^\/]*\/&#x5B;^\/#?]*&quot;) as url,\r\n          sum(score) as score\r\n        from \r\n          `bigquery-public-data.hacker_news.full`\r\n        where\r\n          type = 'story'\r\n          and timestamp &gt; '{{ yesterday_ds }}'\r\n          and timestamp &lt; '{{ ds }}'\r\n          and url like '%https:\/\/github.com%'\r\n          and url not like '%gihub.com\/blog\/%'\r\n        group by \r\n          date, submitter, story_id, url\r\n        &quot;&quot;&quot;,\r\n        destination_dataset_table='my-project-id.github_trends.hackernews_agg${{ yesterday_ds_nodash }}'\r\n    )\r\n\r\n    bq_write_to_hackernews_github_agg = BigQueryOperator(\r\n        task_id='bq_write_to_hackernews_github_agg',\r\n        write_disposition='WRITE_TRUNCATE',\r\n        allow_large_results=True,\r\n        sql=&quot;&quot;&quot;\r\n        with base1 as (\r\n            select * \r\n            from `my-project-id.github_trends.hackernews_agg`\r\n            where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')\r\n        ),\r\n        base2 as (\r\n            select \r\n              repo,\r\n              concat('https:\/\/gitnub.com\/', repo)as url,\r\n              stars_last_28_days,\r\n              stars_last_7_days,\r\n              stars_last_1_day,\r\n              forks_last_28_days,\r\n              forks_last_7_days,\r\n              forks_last_1_day,\r\n            from `my-project-id.github_trends.github_agg`\r\n            where _PARTITIONTIME between timestamp('{{ yesterday_ds }}') and timestamp('{{ yesterday_ds }}')\r\n        )\r\n        select \r\n          base1.date as date,\r\n          base1.url as github_url,\r\n          base2.repo as github_repo,\r\n          base1.score as hn_score,\r\n          base1.story_id as hn_story_id,\r\n          base2.stars_last_28_days as stars_last_28_days,\r\n          base2.stars_last_7_days as stars_last_7_days,\r\n          base2.stars_last_1_day as stars_last_1_day,\r\n          base2.forks_last_28_days as forks_last_28_days,\r\n          base2.forks_last_7_days as forks_last_7_days,\r\n          base2.forks_last_1_day as forks_last_1_day\r\n        from base1 left join base2 on base1.url = base2.url\r\n        &quot;&quot;&quot;,\r\n        destination_dataset_table='my-project-id.github_trends.hackernews_github_agg${{ yesterday_ds_nodash }}'\r\n    )\r\n\r\n    bq_check_hackernews_github_agg = BigQueryCheckOperator(\r\n        task_id='bq_check_hackernews_github_agg',\r\n        sql=&quot;&quot;&quot;\r\n        SELECT _PARTITIONTIME as pt\r\n        FROM `my-project-id.github_trends.hackernews_github_agg`\r\n        where format_timestamp('%Y%m%d', _PARTITIONTIME) = '{{ yesterday_ds_nodash }}'\r\n        order by 1\r\n        limit 1\r\n        &quot;&quot;&quot;\r\n    )\r\n\r\n    &#x5B;bq_check_githubarchive_day &gt;&gt; bq_write_to_github_daily_metrics &gt;&gt; bq_write_to_github_agg,\r\n     bq_check_hackernes_full &gt;&gt; bq_write_to_hackernews_agg] &gt;&gt; bq_write_to_hackernews_github_agg &gt;&gt; bq_check_hackernews_github_agg\r\n<\/pre>\n<p>&nbsp;<br \/>\nbackfill \u30b3\u30de\u30f3\u30c9\u304c Airflow 1.x \u306e\u6642\u3068\u4eca\u4f7f\u3063\u3066\u308b Airflow 2.x \u3068\u3067\u69cb\u6587\u304c\u3061\u3087\u3063\u3068\u5909\u308f\u3063\u3066\u305f\u3002<br \/>\n2.x \u3060\u3068\u3053\u3046\u306a\u308b\u3002<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\n$ airflow dags backfill -t bq_write_to_github_daily_metrics -s 2021-06-02 -e 2021-07-01 bq_hn_trends\r\n<\/pre>\n<p>Cloud Composer \u4f7f\u3046\u3068\u9ad8\u3044\u304b\u3089\u3001e2-small \u30a4\u30f3\u30b9\u30bf\u30f3\u30b9\u3067 Airflow \u3092\u81ea\u524d\u3067\u52d5\u304b\u3057\u3066\u308b\u3002<br \/>\ne2-micro \u3060\u3068\u30e1\u30e2\u30ea\u4e00\u676f\u306b\u306a\u3063\u3066\u30d5\u30ea\u30fc\u30ba\u3057\u305f\uff57<\/p>\n<p>Airflow \u306e\u304a\u304b\u3052\u3067\u4eca\u307e\u3067\u4e00\u6240\u61f8\u547d bash + cron \u3067\u904b\u7528\u3057\u3066\u305f\u306e\u304c\u30a2\u30db\u3089\u3057\u304f\u306a\u3063\u3066\u3057\u307e\u3063\u3066\u3082\u3046\u623b\u308c\u306a\u3044\u3002<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>&nbsp; Apache Airflow \u4f7f\u3044\u51fa\u3057\u305f\u3089\u4fbf\u5229\u3060\u3063\u305f\u306e\u3067\u30e1\u30e2\u3002 Airflow \u3068 BigQuery \u3092\u7e4b\u3052\u308b\u65b9\u6cd5\u3092\u63a2\u3057\u3066\u305f\u3089\u2193\u306e\u30d6\u30ed\u30b0\u8a18\u4e8b\u3092\u898b\u3064\u3051\u305f\u3002 How to aggregate data for  [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[21,20],"tags":[],"class_list":["post-1540","post","type-post","status-publish","format-standard","hentry","category-data-engineering","category-sql"],"_links":{"self":[{"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/posts\/1540","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/comments?post=1540"}],"version-history":[{"count":8,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/posts\/1540\/revisions"}],"predecessor-version":[{"id":1555,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/posts\/1540\/revisions\/1555"}],"wp:attachment":[{"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/media?parent=1540"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/categories?post=1540"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.kwonline.org\/memo2\/wp-json\/wp\/v2\/tags?post=1540"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}