[Airflow] HiveOperator

2021. 12. 26.Career/Python&Airflow

 

hive DB 내 데이터를 꺼내 가공 후 집계 테이블을 제작하기 위해 hive 쿼리를 주기적으로 수행해주는 스케쥴링 작업이 필요하여 airflow를 이용해보고자 하였다. 마침 airflow에는 특정 hive database에서 hql이나 코드를 수행해주는 hiveOperator가 있기에 해당 Operator를 이용해보았다. 

 

from airflow.operators.python_operator import HiveOperator

dag = DAG(
    'ppa_pbs_data_onboarding_selection_tagging_items_daily_v1',
    schedule_interval="@once",
    max_active_runs=1,
    default_args=default_args
)

create_sample_table = HiveOperator(
    task_id='create_sample_table',
    hive_cli_conn_id='sample_conn',
    hiveconf_jinja_translate=True,
    hql='select id, name from user_sample limit 10;',
    dag=dag
)

 

 

airflow에서 hive operator를 사용하기 전에 먼저, connectionId를 생성하여야 하는 데, 특정 connection 정보는 다음 문서에서와 같이 airflow admin 화면에서 생성할 수 있다. 그리고 hive Operator에 필요한 항목 하나씩 살펴 보자면, 

  • task_id : 모든 Operator에 공통으로 있는 항목으로 dag내 task 작업을 식별하기 위한 이름을 부여한다. 
  • hive_cli_conn_id : hive client에 접속하기 위한 connectionId를 입력한다. Operator가 아닌 DAG 선언문 내 default_args에 기입하여 연결을 시도할 수 있다. 
  • hql : 실행할 hive 쿼리를 입력한다. 위와 같이 입력할수도 있고, 변수, 파일 경로 등의 형태로도 기입할 수 있다. 
  • hiveconfs : hive의 설정을 등록할 때 사용하는 항목이다. { 'key' : 'value' } 형태로 작성하면 -hiveconf "key"="value" 명령어를 통해 hive의 설정을 변경할 수 있다. 
  • hiveconf_jinja_translate : True로 하게 되면 hive 설정 템플릿 내 변수를 jinja template 내 변수의 형태로 변경해주는 작업을 수행한다. DAG의 변수를 hive 쿼리 내에서 사용하고자 할 때 True로 만들어 사용할 수 있다. 예를 들어, DAG 생성을 통해 execution_date 값을 이용하여 {{(execution_date + macros.timedelta(hours=9)).strftime('%Y%m%d%H%M%S')}} 를 hql에서 사용할 수 있다. 
  • 그 외에도, Hadoop에서 MapReduce 작업 설정을 위해 사용할 수 있는 항목들이 있다. 

 

'Career > Python&Airflow' 카테고리의 다른 글

Pandas로 MongoDB 데이터 쉽게 옮기기  (0) 2022.07.21
myoskin