Monitoring Airflow workflows
This topic describes how to set up Unravel Server to monitor Airflow workflows so you can see them in Unravel Web UI.
Important
Due to an Airflow bug in v1.10.0, Unravel only supports v1.10.1+, not v1.10.0.
Note
Before you start, ensure the Unravel Server host and the server that runs the Airflow web service are in the same cluster.
All the following steps are on the Unravel Server host that runs the unravel_jcs2
daemon.
Connecting to your Airflow Web UI
In
/usr/local/unravel/etc/unravel.properties
, update or add these properties:HTTP for Airflow Web UI access
If your Airflow Web UI uses HTTP, set these properties:
com.unraveldata.airflow.protocol=http com.unraveldata.airflow.server.url=
airflow-ui-url
com.unraveldata.airflow.available=trueHTTPS for Airflow Web UI access
If your Airflow Web UI uses HTTPS, set these properties:
com.unraveldata.airflow.server.url=
airflow-ui-url
com.unraveldata.airflow.available=true com.unraveldata.airflow.login.name=airflow-ui-userame
com.unraveldata.airflow.login.password=airflow-ui-password
Restart
unravel_jcs2daemon
.sudo /etc/init.d/unravel_jcs2 restart
Changing the Monitoring Window
By default, Unravel Server ingests all the workflows that started within the last five (5) days. You change the date range to the last X
days.
Open
/usr/local/unravel/etc/unravel.properties
and update the following property. If you can't find it, add it. Note there’s a “-” (minus sign) in the value.airflow.look.back.num.days=-X
Restart the
unravel_jcs2daemon
.sudo /etc/init.d/unravel_jcs2 restart
Enabling AirFlow
Here is a sample script, spark-test.py
, for Spark.
spark-test.py
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators import PythonOperator from datetime import datetime, timedelta import subprocess default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), }
In Airflow, workflows are represented by directed acyclic graphs (DAGs). For example,
dag = DAG('spark-test', default_args=default_args)
Add hooks for Unravel instrumentation.
The following script,
example-hdp-client.sh
, adds hooks for Unravel instrumentation by setting three Unravel-specific configuration parameters for Spark applications.spark.driver.extraJavaOptions
spark.executor.extraJavaOptions
spark.unravel.server.hostport
We recommend setting these parameters on a per-application basis only when you want to monitor/profile certain applications, rather than all the applications running in the cluster. Alternatively, you can specify these parameters in
spark-defaults.conf
.This script can be invoked to submit an Airflow Spark application via spark-submit. It references the following variables, which need to be changed to values valid for your local environment.
PATH_TO_SPARK_EXAMPLE_JAR
=/usr/hdp/2.3.6.0-3796/spark/lib/spark-examples-*.jarUNRAVEL_SERVER_IP_PORT
=10.20.30.40:4043SPARK_EVENT_LOG_DIR
=hdfs://ip-10-0-0-21.ec2.internal:8020/user/ec2-user/eventlog
example-hdp-client.sh
hdfs dfs -rmr pair.parquet spark-submit \ --class org.apache.spark.examples.sql.RDDRelation \ --master yarn-cluster \ --conf "spark.driver.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=DriverProbe.class:SQLProbe.class -Dcom.sun.btrace.FileClient.flush=-1 -Dcom.unraveldata.spark.sensor.disableLiveUpdates=true" \ --conf "spark.executor.extraJavaOptions=-javaagent:/usr/local/unravel_client/btrace-agent.jar=unsafe=true,stdout=false,noServer=true,startupRetransform=false,bootClassPath=/usr/local/unravel_client/unravel-boot.jar,systemClassPath=/usr/local/unravel_client/unravel-sys.jar,scriptOutputFile=/dev/null,script=ExecutorProbe.class -Dcom.sun.btrace.FileClient.flush=-1" \ --conf "spark.unravel.server.hostport=$UNRAVEL_SERVER_IP_PORT" \ --conf "spark.eventLog.dir=$SPARK_EVENT_LOG_DIR" \ --conf "spark.eventLog.enabled=true" \ $PATH_TO_SPARK_EXAMPLE_JAR
Submit the workflow.
Operators (also called tasks) determine execution order (dependencies). In the example below,
t1
andt2
are operators created byBashOperator
orPythonOperator
. They invokeexample-hdp-client.sh
, that submits the workflow for execution.Note
Note: The path name of
example-hdp-client.sh
is relative to the current directory, not to~/airflow/dags
as in the Airflow operator above.t1 = BashOperator( task_id='example-hdp-client', bash_command="example-scripts/example-hdp-client.sh", retries=3, dag=dag) def spark_callback(**kwargs): sp = subprocess.Popen(['/bin/bash', 'airflow/dags/example-scripts/example-hdp-client.sh'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) print sp.stdout.read() t2 = PythonOperator( task_id='example-python-call', provide_context=True, python_callable=spark_callback, retries=1, dag=dag) t2.set_upstream(t1)
Tip
You can test the operators first. For example, in Airflow:
airflow test spark-test example-python-call