浅谈调度工具——Airflow

本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web 界面、DAG 配置、常用配置以及 Airflow DAG Creation Manager Plugin 这一款 Airflow 插件。

一、什么是 Airflow

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。

Airflow 通过 DAG 也即是有向非循环图来定义整个工作流,因而具有非常强大的表达能力。

如上图所示,一个工作流可以用一个 DAG 来表示,在 DAG 中将完整得记录整个工作流中每个作业之间的依赖关系、条件分支等内容,并可以记录运行状态。通过 DAG,我们可以精准的得到各个作业之间的依赖关系。

在进一步介绍 Airflow 之前,我想先介绍一些在 Airflow 中常见的名词概念:

  • DAG
    DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。
  • Task
    Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。
  • DAG Run
    当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。
  • Task Instance
    当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。

二、Airflow 的服务构成

一个正常运行的 Airflow 系统一般由以下几个服务构成

  • WebServer
    Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
  • Worker
    一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
  • Scheduler
    整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
  • Flower
    Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。

三、Airflow 的 Web 界面

下面简单介绍一下 Airflow 的 Web 操作界面,从而可以对 Airflow 有一个更直观的了解。

1、DAG 列表

  1. 左侧 On/Off 按钮控制 DAG 的运行状态,Off 为暂停状态,On 为运行状态。注意:所有 DAG 脚本初次部署完成时均为 Off 状态。
  2. 若 DAG 名称处于不可点击状态,可能为 DAG 被删除或未载入。若 DAG 未载入,可点击右侧刷新按钮进行刷新。注意:由于可以部署若干 WebServer,所以单次刷新可能无法刷新所有 WebServer 缓存,可以尝试多次刷新。
  3. Recent Tasks 会显示最近一次 DAG Run(可以理解为 DAG 的执行记录)中 Task Instances(可以理解为作业的执行记录)的运行状态,如果 DAG Run 的状态为 running,此时显示最近完成的一次以及正在运行的 DAG Run 中所有 Task Instances 的状态。
  4. Last Run 显示最近一次的 execution date。注意:execution date 并不是真实执行时间,具体细节在下文 DAG 配置中详述。将鼠标移至 execution date 右侧 info 标记上,会显示 start date,start date 为真实运行时间。start date 一般为 execution date 所对应的下次执行时间。

2、作业操作框

在 DAG 的树状图和 DAG 图中都可以点击对应的 Task Instance 以弹出 Task Instance 模态框,以进行 Task Instance 的相关操作。注意:选择的 Task Instance 为对应 DAG Run 中的 Task Instance。

  1. 在作业名字的右边有一个漏斗符号,点击后整个 DAG 的界面将只显示该作业及该作业的依赖作业。当该作业所处的 DAG 较大时,此功能有较大的帮助。
  2. Task Instance Details 显示该 Task Instance 的详情,可以从中得知该 Task Instance 的当前状态,以及处于当前状态的原因。例如,若该 Task Instance 为 no status 状态,迟迟不进入 queued 及 running 状态,此时就可通过 Task Instance Details 中的 Dependency 及 Reason 得知原因。
  3. Rendered 显示该 Task Instance 被渲染后的命令。
  4. Run 指令可以直接执行当前作业。
  5. Clear 指令为清除当前 Task Instance 状态,清除任意一个 Task Instance 都会使当前 DAG Run 的状态变更为 running。注意:如果被清除的 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。Clear 有额外的5个选项,均为多选,这些选项从左到右依次为:
    Past: 同时清除所有过去的 DAG Run 中此 Task Instance 所对应的 Task Instance。
    Future: 同时清除所有未来的 DAG Run 中此 Task Instance 所对应的 Task Instance。注意:仅清除已生成的 DAG Run 中的 Task Instance。
    Upstream: 同时清除该 DAG Run 中所有此 Task Instance 上游的 Task Instance。
    Downstream: 同时清除该 DAG Run 中所有此 Task Instance 下游的 Task Instance。
    Recursive: 当此 Task Instance 为 sub DAG 时,循环清除所有该 sub DAG 中的 Task Instance。注意:若当此 Task Instance 不是 sub DAG 则忽略此选项。
  6. Mark Success 指令为讲当前 Task Instance 状态标记为 success。注意:如果该 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。

四、DAG 配置

Airflow 中的 DAG 是由 Python 脚本来配置的,因而可扩展性非常强。Airflow 提供了一些 DAG 例子,我们可以通过一个例子来简单得了解一下。

# -*- coding: utf-8 -*-

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    schedule_interval='0 0 * * *')

cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)

run_this = BashOperator(
    task_id='run_after_loop', bash_command='echo 1', dag=dag)
run_this.set_downstream(run_this_last)

for i in range(3):
    i = str(i)
    task = BashOperator(
        task_id='runme_'+i,
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        dag=dag)
    task.set_downstream(run_this)

task = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag)
task.set_downstream(run_this_last)

我们可以看到,整个 DAG 的配置就是一份完整的 Python 代码,在代码中实例化 DAG,实例化适合的 Operator,并通过 set_downstream 等方法配置上下游依赖关系。下面我们简单看一下在 DAG 配置中的几个重要概念。

  • DAG
    要配置一个 DAG 自然需要一个 DAG 实例。在同一个 DAG 下的所有作业,都需要将它的 dag 属性设置为这个 DAG 实例。在实例化 DAG 时,通过传参数可以给这个 DAG 实例做一些必要的配置。

    • dag_id
      给 DAG 取一个名字,方便日后维护。
    • default_args
      默认参数,当属于这个 DAG 实例的作业没有配置相应参数时,将使用 DAG 实例的 default_args 中的相应参数。
    • schedule_interval
      配置 DAG 的执行周期,语法和 crontab 的一致。
  • 作业 (Task)
    Airflow 提供了很多 Operator,我们也可以自行编写新的 Operator。在本例中使用了 2 种 Operator,DummyOperator 什么都不会做, BashOperator 则会执行 bash_command 参数所指定的 bash 指令,并且使用 jinja2 模版引擎,对该指令进行渲染,因而在本例的 bash_command 中,可以看到一些需要渲染的变量。当 Operator 被实例化后,我们称之为相应 DAG 的一个作业(Task)。在实例化 Operator 时,同样可以通过穿参数进行必要的配置,值得注意的是,如果在 DAG 中有设置 default_args 而在 Operator 中没有覆盖相应配置,则会使用 default_args 中的配置。

    • dag
      传递一个 DAG 实例,以使当前作业属于相应 DAG。
    • task_id
      给作业去一个名字,方便日后维护。
    • owner
      作业的拥有者,方便作业维护。另外有些 Operator 会根据该参数实现相应的权限控制。
    • start_date
      作业的开始时间,即作业将在这个时间点以后开始调度。
  • 依赖
    配置以来的方法有两种,除了可以使用作业实例的 set_upstream 和 set_downstream 方法外,还可以使用类似

    task1 << task2 << task3
    task3 >> task4

    这样更直观的语法来设置。

这里我们要特别注意一个关于调度执行时间的问题。在谈这个问题前,我们先确定几个名词:

  • start date: 在配置中,它是作业开始调度时间。而在谈论执行状况时,它是调度开始时间。
  • schedule interval: 调度执行周期。
  • execution date: 执行时间,在 Airflow 中称之为执行时间,但其实它并不是真实的执行时间。

那么现在,让我们看一下当一个新配置的 DAG 生效后第一次调度会在什么时候。很多人会很自然的认为,第一次的调度时间当然是在作业中配置的 start date,但其实并不是。第一次调度时间是在作业中配置的 start date 的第二个满足 schedule interval 的时间点,并且记录的 execution date 为作业中配置的 start date 的第一个满足 schedule interval 的时间点。听起来很绕,让我们来举个例子。

假设我们配置了一个作业的 start date 为 2017年10月1日,配置的 schedule interval 为 00 12 * * * 那么第一次执行的时间将是 2017年10月2日 12点 而此时记录的 execution date 为 2017年10月1日 12点。因此 execution date 并不是如其字面说的表示执行时间,真正的执行时间是 execution date 所显示的时间的下一个满足 schedule interval 的时间点。

另外,当作业已经执行过之后,start date 的配置将不会再生效,这个作业的调度开始时间将直接按照上次调度所对应的 execution date 来计算。

这个例子只是简要的介绍了一下 DAG 的配置,也只介绍了非常少量的配置参数。Airflow 为 DAG 和作业提供了大量的可配置参数,详情可以参考 Airflow 官方文档

五、常用配置

在日常工作中,有时候仅仅靠配置作业依赖和调度执行周期并不能满足一些复杂的需求。接下来将介绍一些常用的作业配置。

1、跳过非最新 DAG Run

假如有一个每小时调度的 DAG 出错了,我们把它的调度暂停,之后花了3个小时修复了它,修复完成后重新启动这个作业的调度。于是 Airflow 一下子创建了 3 个 DAG Run 并同时执行,这显然不是我们希望的,我们希望它只执行最新的 DAG Run。

我们可以创建一个 Short Circuit Operator,并且让 DAG 中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前 DAG Run 是否为最新,不是最新的直接跳过整个 DAG。

def skip_dag_not_latest_worker(ds, **context):
    if context['dag_run'] and context['dag_run'].external_trigger:
        logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
        return True

    skip = False
    now = datetime.now()
    left_window = context['dag'].following_schedule(context['execution_date'])
    right_window = context['dag'].following_schedule(left_window)
    logging.info('Checking latest only with left_window: %s right_window: %s now: %s', left_window, right_window, now)

    if not left_window < now <= right_window:
        skip = True
    return not skip

ShortCircuitOperator(
    task_id='skip_dag_not_latest',
    provide_context=True,
    python_callable=skip_dag_not_latest_worker,
    dag=dag
)

2、当存在正在执行的 DAG Run 时跳过当前 DAG Run

依旧是之前提到的每小时调度的 DAG,假设它这次没有出错而是由于资源、网络或者其他问题导致执行时间变长,当下一个调度时间开始时 Airflow 依旧会启动一次新的 DAG Run,这样就会同时出现 2 个 DAG Run。如果我们想要避免这种情况,一个简单的方法是直接将 DAG 的 max_active_runs 设置为 1。但这样会导致 DAG Run 堆积的问题,如果你配置的调度是早上 9 点至晚上 9 点,直至晚上 9 点之后 Airflow 可能依旧在处理堆积的 DAG Run。这样就可能影响到我们原本安排在晚上 9 点之后的任务。

我们可以创建一个 Short Circuit Operator,并且让 DAG 中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前是否存在正在执行的 DAG Run,存在时则直接跳过整个 DAG。

def skip_dag_when_previous_running_worker(ds, **context):
    if context['dag_run'] and context['dag_run'].external_trigger:
        logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
        return True

    skip = False
    session = settings.Session()
    count = session.query(DagRun).filter(
        DagRun.dag_id == context['dag'].dag_id,
        DagRun.state.in_(['running']),
    ).count()
    session.close()
    logging.info('Checking running DAG count: %s' % count)
    skip = count > 1
    return not skip

ShortCircuitOperator(
    task_id='skip_dag_when_previous_running',
    provide_context=True,
    python_callable=skip_dag_when_previous_running_worker,
    dag=dag
)

3、Sensor 的替代方案

Airflow 中有一类 Operator 被称为 Sensor,Sensor 可以感应预先设定的条件是否满足(如:某个时间点是否达到、某条 MySQL 记录是否被更新、某个 DAG 是否完成),当满足条件后 Sensor 作业变为 Success 使得下游的作业能够执行。Sensor 的功能很强大但却带来一个问题,假如我们有一个 Sensor 用于检测某个 MySQL 记录是否被更新,在 Sensor 作业启动后 3 个小时这个 MySQL 记录才被更新。于是我们的这个 Sensor 占用了一个 Worker 整整 3 小时,这显然是一个极大的浪费。

因此我们需要一个 Sensor 的替代方案,既能满足 Sensor 原来的功能,又能节省 Worker 资源。有一个办法是不使用 Sensor,直接使用 Python Operator 判断预先设定的条件是否满足,如果不满足直接 raise Exception,然后将这个作业的 retry_delay(重试间隔时间) 设为每次检测的间隔时间,retries(重试次数) 设为最长检测时间除以 retry_delay,即满足:最长检测时间 = retries * retry_delay。这样既不会长时间占用 Worker 资源,又可以满足 Sensor 原来的功能。

六、Airflow DAG Creation Manager Plugin

正如上两章所描述的,Airflow 虽然具有强大的功能,但是配置 DAG 并不是简单的工作,也有一些较为繁琐的概念,对于业务人员来说可能略显复杂。因此,笔者编写了 Airflow DAG Creation Manager Pluginhttps://github.com/lattebank/airflow-dag-creation-manager-plugin)以提供一个 Web界面来让业务人员可视化的编写及管理 DAG。具体的安装及使用方法请查看插件的README

如上图所示,插件的 Web 界面中可以直接所见即所得的编写 DAG 图。

插件中尽量简化了一些繁琐的诸如上文所述的作业开始调度时间等一系列的概念,并提供了一些在实际工作中常常会用到的一些额外的功能(如上文提到的跳过非最新 DAG Run、当存在正在执行的 DAG Run 时跳过当前 DAG Run 等),以及版本控制和权限管理。如果大家在使用 Airflow 的过程中也有类似的问题,欢迎尝试使用 Airflow DAG Creation Manager Plugin。

七、总结

Airflow 适用于调度作业较为复杂,特别是各作业之间的依赖关系复杂的情况。

希望本文能让大家对 Airflow 有所了解,并能将 Airflow 运用到适合它使用的场景中。

Presto + Hive 统一账户体系及查询监控轻型解决方案

Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运行。其优点是学习成本低,可以通过类 SQL 语句快速实现简单的 MapReduce 统计,不必开发专门的 MapReduce 应用,十分适合数据仓库的统计分析,是专为批处理设计的。但随着数据越来越多,使用 Hive 进行一个简单的数据查询可能要花费几分钟到几小时,显然不能满足交互式查询的需求。Presto 通过分布式查询,可以近实时快速高效的完成海量数据的查询。Presto 查询引擎是一个 Master-Slave 的架构,由一个 Coordinator 节点,多个 Worker 节点组成。Presto 被设计为数据仓库和数据分析产品:数据即席查询分析、大规模数据聚集和生成报表。Hue 是一个开源的 Apache Hadoop UI 系统,由 Cloudera Desktop 演化而来,最后 Cloudera 公司将其贡献给 Apache 基金会的 Hadoop 社区,它是基于 Python Web 框架 Django 实现的。使用 Hue 我们可以在浏览器端的 Web 控制台上与 Hadoop 集群进行交互来分析处理数据,例如操作 HDFS 上的数据,运行 MapReduce Job,执行 Hive 的 SQL 语句,浏览 HBase 数据库等等。

本文将介绍如何利用这些组件搭建及管理大数据交互平台,主要包括两节内容。第一节介绍平台搭建,包括 Hive 和 Hue 双活搭建,重点介绍如何利用 Slider 搭建 Presto on Yarn 平台。第二节介绍权限管理,包括 Hue 账号体系介绍,Hive 超级用户管理,Hive 集成 Hue 账号体系,presto-jdbc 集成 Hue 账号体系。

平台搭建

利用 Cloudera Manager 可以很方便的部署 Hue 和 Hive,具体的搭建过程本文就不再详述,我们采用如图所示的结构利用 haproxy 实现双活及负载均衡。日常工作中,数据分析人员主要使用 Hue 来提交交互式查询语句,每天也会有调度工具以及应用程序完成批量作业的查询,可以是直接利用 beeline 客户端、或者使用 python 脚本封装好的存储过程、或者是通过 jdbc 连接来提交查询任务。

利用 Slider 搭建 Presto on Yarn 简介

Presto 查询引擎是一个 Master-Slave 的架构,由一个 Coordinator 节点,一个 Discovery Server 节点,多个 Worker 节点组成,Discovery Server 通常内嵌于 Coordinator 节点中。Coordinator 负责解析 SQL 语句,生成执行计划,分发执行任务给 Worker 节点执行。Worker 节点负责实际执行查询任务。Worker 节点启动后向 Discovery Server 服务注册,Coordinator 从 Discovery Server 获得可以正常工作的 Worker 节点。如果配置了 Hive Connector,需要配置一个 Hive MetaStore 服务为 Presto 提供 Hive 元信息,Worker 节点与 HDFS 交互读取数据。

Yarn 的架构中,一个全局 ResourceManager 以主要后台进程的形式运行,它通常在专用机器上运行,在各种竞争的应用程序之间仲裁可用的集群资源。ResourceManager 会追踪集群中有多少可用的活动节点和资源,协调用户提交的哪些应用程序应该在何时获取这些资源。ResourceManager 是唯一拥有此信息的进程,所以它可通过某种共享的、安全的、多租户的方式制定分配(或者调度)决策。Presto 的 Master-Slave 的架构可以完全将 Coordinator 和 Worker 作为单个的容器交给 Yarn 来统一管理,包括资源的分配及程序的活动管理,这样集群中每台机器的资源都可以有 Yarn 统一协调分配资源,而不用人为的单独从机器中划分一部分资源运行 Worker 或 Coordinator。

一般来说基于 Yarn 开发应用,需要用户自己编写 Application Master 来处理资源申请、容错等,难度和复杂性比较大。Apache 二级孵化项目 Slider 是 Yarn 之外的孵化项目,目的是将用户的已有服务或者应用直接部署到 Yarn 上。随着 Slider 项目的发布,用户可以在不对已存在服务进行任何修改的前提下将之部署到 Yarn 集群中。Slider 可以用来部署分布式应用(如 Hbase、Presto)到 Yarn 集群,并且 Slider 可以对应用进行监控,还可以在不停止应用的情况下动态的扩大或者缩小应用的规模。Yarn 监控 container 的状态并通知 Slider-appmaster,如果某个 container 失败,Slider-appmaster 会向 Yarn 申请一个新的 container。

本小节接下来将详细介绍如何利用 Slider 将 Presto 搭建在 Yarn 上。

Yarn 基于 label 的 CapacityScheduler 设置

因为在 Presto 的使用过程中,需要知道 Presto 的 Coordinator 连接地址,为了使用方便最好是能将 Coordinator 固定在某台机器上。本文所采取的方案是利用 Yarn 的标签调度机制,给其中某台机器打上 Coordinator 标签,剩下的打上 Worker 标签以保证每次 Presto 提交到 Yarn 上时,Coordinator 这个组件都能运行在固定的机器上。所以在介绍搭建过程之前,先介绍 Yarn 基于 label 的 CapacityScheduler 设置。

给集群中的节点打上 label 是将相似特点的节点进行分组的一种途径,借助 queue 可以达到指定某些类型的 application 运行在标有特定标签的机器上。例如,Spark Streaming 实时长时服务与 MapReduce、Spark、Hive 等批处理应用共享 Yarn 集群资源。在共享环境中,经常因一个批处理应用占用大量网络资源或者 CPU 资源导致 Spark Streaming 资源被抢占,服务不稳定。基于标签的调度策略的基本思想是:用户可以为每个 nodemanager 标注几个标签,比如 highmem,highdisk 等,以表明该 nodemanager 的特性;同时,用户可以为调度器中每个队列标注几个标签,这样提交到某个队列中的作业,只会使用标注有对应标签的节点上的资源。

举个例子:假设某 hadoop 集群共有 7 个节点,硬件资源是 16GB 内存,4TB 磁盘;随着内存计算应用 spark streaming 程序越来越多,可以再另外添加 5 个大内存节点,比如内存是 64GB,为了让 spark 程序与 mapreduce 等其他程序更加和谐地运行在一个集群中,希望 spark 程序只运行在后来的 5 个大内存节点上,而之前的 mapreduce 程序既可以运行在之前的 7 个节点上,也可以运行在后来的 5 个大内存节点上,怎么办?

基于标签的调度解决方案是:
步骤 1:为旧的 7 个节点打上 normal 标签,为新的 5 个节点打上 highmem 标签;
步骤 2:在 capacity scheduler 中,创建两个队列,分别是 hadoop 和 spark,其中 hadoop 队列可使用的标签是 nornal 和 highmem,而 spark 则是 highmem,并配置两个队列的 capacity 和 maxcapacity。

这样就可以实现 spark 作业只能跑在 spark 集群上而不会影响生产的 mapreduce 作业,当无 spark 作业运行时,spark 集群的资源也可以被 mapreduce 作业使用。

在部署 Presto on Yarn 时,因为 Coordinator 所在的 container 受 Yarn 任意分配控制,每次部署 Presto 时都可能被部署在任意一个 nodemanager 上,也就是说用户的连接 host 不是固定的。这时可以通过指定 Coordinator 的 “yarn.label.expression”=”presto_coordinator” 参数进行限制,将 “presto_coordinator” 标签只打在一个节点上,这样就间接的达到了固定 coordinator 所在 host 的目的。

添加存放标签的 hdfs 目录

可以将节点与标签的对应关系数据保存在 hdfs 中,这样 yarn 在每次重启时都不会丢失。

hadoop fs -mkdir -p /yarn/node-labels
hadoop fs -chown -R cloudera-scm:cloudera-scm /yarn
hadoop fs -chmod -R 700 /yarn

yarn 设置 yarn-site.xml

<property>
  <name>yarn.node-labels.enabled</name>
  <value>true</value>
</property>
<property>
  <name>yarn.node-labels.fs-store.root-dir</name>
  <value>hdfs://nameservice1:8020/yarn/node-labels</value>
</property>

添加标签

# 在集群中注册标签
yarn rmadmin -addToClusterNodeLabels "presto_coordinator"
yarn rmadmin -addToClusterNodeLabels "presto_worker"

# 给节点添加标签
yarn rmadmin -replaceLabelsOnNode "hadoop01:8041,presto_coordinator,presto_worker"
yarn rmadmin -replaceLabelsOnNode "hadoop02:8041,presto_worker"
yarn rmadmin -replaceLabelsOnNode "hadoop03:8041,presto_worker"

# 删除节点上的标签
yarn rmadmin -replaceLabelsOnNode "hadoop01:8041"

# 查看节点的标签状态
yarn node -status hadoop01:8041

# 刷新队列配置
yarn rmadmin -refreshQueues

也可以在http://hadoop02:8088/cluster/nodes上查看到集群节点所有的标签。

设置队列

调度器与 node label 相关的配置项:

  • yarn.scheduler.capacity.<queue-path>.accessible-node-labels.<label>.capacity,特定的某个标签在子队列中设置的数值和必须是100
  • yarn.scheduler.capacity.<queue-path>.default-node-label-expression,提交到队列中的作业默认使用的标签,只能设置一个值。如果不设置表示 application 会从没有 node label 的节点中获取容器。

以 root.offline.log 子队列为例,配置文件如下。

<!-- offline队列label设置 -->

    <property>
      <name>yarn.scheduler.capacity.root.offline.accessible-node-labels.presto_worker.capacity</name>
      <description>队列对标签节点可用的百分比</description>
      <value>50</value>
    </property>

    <property>
      <name>yarn.scheduler.capacity.root.offline.accessible-node-labels.presto_worker.maximum-capacity</name>
      <description>队列对标签节点最大可用的百分比</description>
      <value>80</value>
    </property>

     <property>
    <name>yarn.scheduler.capacity.root.offline.accessible-node-labels.presto_coordinator.capacity</name>
    <value>50</value>
    <description>队列对标签节点可用的百分比</description>
    </property>

    <property>
    <name>yarn.scheduler.capacity.root.offline.accessible-node-labels.presto_coordinator.maximum-capacity</name>
    <value>80</value>
    <description>队列对标签节点可用的百分比</description>
    </property>

    <property>
      <name>yarn.scheduler.capacity.root.offline.accessible-node-labels</name>
      <value>*</value>
      <description>队列应用可用的节点标签</description>
    </property>

    <property>
      <name>yarn.scheduler.capacity.root.offline.default-node-label-expression</name>
      <value>presto_worker</value>
      <description>队列应用默认节点标签</description>
    </property>

 <!-- offline.log队列label设置 -->

  <property>
      <name>yarn.scheduler.capacity.root.offline.log.accessible-node-labels.presto_worker.capacity</name>
      <description>队列对标签节点可用的百分比</description>
      <value>50</value>
    </property>

    <property>
    <name>yarn.scheduler.capacity.root.offline.log.accessible-node-labels.presto_coordinator.capacity</name>
    <value>100</value>
    <description>队列对标签节点可用的百分比</description>
    </property>

    <property>
      <name>yarn.scheduler.capacity.root.offline.log.accessible-node-labels</name>
      <value>*</value>
      <description>队列应用可用的节点标签</description>
    </property>

    <property>
      <name>yarn.scheduler.capacity.root.offline.log.default-node-label-expression</name>
      <value>presto_worker</value>
      <description>队列应用默认节点标签</description>
    </property>

踩过的坑

提交任务一直处于 ACCEPTED 状态

任务处于 ACCEPTED 状态说明该任务所在的队列没有可用的资源。对每个父队列及子队列都必须设置yarn.scheduler.capacity.<queue-path>.accessible-node-labelsyarn.scheduler.capacity.<queue-path>.default-node-label-expression

部署 Presto on Yarn 时,worker 所在的 container 会多次失败

因为某次部署 presto 时启动的 container 没有被正常关闭,导致后台一直存在一个 worker 在运行占用了端口。

使用 Slider 部署 Presto on Yarn

在测试环境用 Slider 搭建好的 Presto on Yarn 如图所示,显示总共向 Yarn 申请了 4 个 container,1 个用于 slider-appmaster,1 个 coordinator,2 个 worker

用Slider部署Presto

1, 调整集群yarn-site.xml参数

yarn.scheduler.minimum-allocation-mb >= 256 (ensure that YARN can allocate sufficient number of containers)

yarn.nodemanager.delete.debug-delay-sec >= 3600 (to retain for an hour allow easy debugging)

2, 从链接下载slider-0.92.0-incubating-all.tar.gz到集群的任一个节点

ssh cloudera-scm@hadoop02
wget https://repo1.maven.org/maven2/org/apache/slider/slider-assembly/0.92.0-incubating/slider-assembly-0.92.0-incubating-all.tar.gz
tar -zxvf slider-0.92.0-incubating-all.tar.gz

3, 配置 JAVA_HOME 和 HADOOP_CONF_DIR

集群的每个机器都默认设置了 JAVA_HOME 指向 jdk7,所以只需要配置 HADOOP_CONF_DIR 为/etc/hadoop/conf

vim slider-0.92.0-incubating/conf/slider-env.sh
export JAVA_HOME=${JAVA_HOME}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}

4, 修改slider-0.92.0-incubating/conf/slider-client.xml配置 zookeeper 地址

<property>
  <name>slider.zookeeper.quorum</name>
  <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>

<property>
  <name>fs.defaultFS</name>
  <value>hdfs://nameservice1</value>
</property>

5, 编译 presto-yarn-package

presto-yarn下载源码编译得到presto-yarn-package-1.6-SNAPSHOT-0.167.zip

git clone git@github.com:prestodb/presto-yarn.git
cd presto-yarn
mvn clean package

在下载依赖包时会有两个很大的包占用很长时间,可以自己在网上下载之后放在~/.m2目录

6配置 appConfig.json

在 presto-yarn/presto-yarn-package/src/main/resources 目录下有 appConfig.json 和 resources-multinode.json 模板,在此基础上进行修改。

{
  "schema": "http://example.org/specification/v2.0.0",
  "metadata": {
  },
  "global": {
    "site.global.app_user": "cloudera-scm",
    "site.global.user_group": "cloudera-scm",
    "site.global.data_dir": "/var/lib/presto/data",
    "site.global.config_dir": "/var/lib/presto/etc",
    "site.global.app_name": "presto-server-0.167",
    "site.global.app_pkg_plugin": "${AGENT_WORK_ROOT}/app/definition/package/plugins/",
    "site.global.singlenode": "false",
    "site.global.coordinator_host": "${COORDINATOR_HOST}",
    "site.global.presto_query_max_memory": "2GB",
    "site.global.presto_query_max_memory_per_node": "1GB",
    "site.global.presto_server_port": "8083",
    "site.global.catalog": "{'Hive': ['connector.name=Hive-hadoop2','Hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml','Hive.metastore.uri=thrift://hadoop01:9083,thrift://hadoop02:9083'], 'tpch': ['connector.name=tpch']}",
    "site.global.jvm_args": "['-server','-Xmx2G','-XX:-UseBiasedLocking','-XX:+UseG1GC','-XX:G1HeapRegionSize=32M','-XX:+ExplicitGCInvokesConcurrent','-XX:+HeapDumpOnOutOfMemoryError','-XX:+UseGCOverheadLimit','-XX:OnOutOfMemoryError=kill -9 %p','-XX:ReservedCodeCacheSize=512M','-DHADOOP_USER_NAME=Hive']",
    "site.global.log_properties": "['com.facebook.presto=INFO']",
    "application.def": ".slider/package/PRESTO/presto-yarn-package-1.6-SNAPSHOT-0.167.zip",
    "java_home": "/usr/java/jdk1.8.0_131"
  },
  "components": {
    "slider-appmaster": {
      "jvm.heapsize": "128M"
    }
  }
}

配置细则参考文档 installation-yarn-configuration-options

  • 因为 slider 命令使用 cloudera-scm 用户执行,所以在 HDFS 中要先添加用户目录用于存放临时文件。
hdfs dfs -mkdir -p /user/cloudera-scm
hdfs dfs -chown cloudera-scm:cloudera-scm -R /user/cloudera-scm
  • 每个机器都要创建目录,存放部署配置文件及日志
sudo mkdir /var/lib/presto/{data,etc}
sudo chown -R cloudera-scm:cloudera-scm /var/lib/presto
  • 其余参数基本上与 prestoadmin 部署时所用的参数类似,要注意其中的 site.global.app_name 和 application.def 配置项,与本文档的上下文有关联。

7, 配置 resources.json

{
  "schema": "http://example.org/specification/v2.0.0",
  "metadata": {
  },
  "global": {
    "yarn.vcores": "1"
  },
  "components": {
    "slider-appmaster": {
      "yarn.label.expression":"presto_coordinator"
    },
    "COORDINATOR": {
      "yarn.role.priority": "1",
      "yarn.component.instances": "1",
      "yarn.component.placement.policy": "2",
      "yarn.label.expression":"presto_coordinator",
      "yarn.memory": "1500"
    },
    "WORKER": {
      "yarn.role.priority": "2",
      "yarn.component.instances": "2",
      "yarn.component.placement.policy": "2",
      "yarn.label.expression":"presto_worker",
      "yarn.memory": "1500"
    }
  }
}

其中 yarn.component.instances 表示对应角色的实例数,因为采用的是 mutlinode 模式部署,也就是 COORDINATOR 和 WORKER 不共用同一台机器,所以两个实例数之和不能大于机器的数量。yarn.vcores 和 yarn.memory 可用设定向 YARN 申请的 container 资源数量。

当 container 挂掉之后 slider-appmaster 会向 yarn 重新提交申请重启某个服务,需要将yarn.component.placement.policy设置为 2

参考链接:

8, 启动 Slider

先将编译好的 presto-yarn-package-1.6-SNAPSHOT-0.167.zip 及 appConfig.json,resources.json 上传到与 slider-0.92.0-incubating 同一台机器上。

cd slider-0.92.0-incubating
# 部署 presto-yarn-package
bin/slider package --install --name PRESTO --package ../presto-yarn/presto-yarn-package-1.6-SNAPSHOT-0.167.zip

# 启动 applicaiton,也就是部署presto服务
bin/slider create prestoapp --template /opt/bigdata/presto-yarn/appConfig.json  --resources /opt/bigdata/presto-yarn/resources.json --queue datax

# 管理 applicaiton
bin/slider status prestoapp 
bin/slider start prestoapp
bin/slider stop prestoapp

# 查看 coordinator 部署在哪个节点,可以作为-- server 参数提供给 presto-cli 使用
bin/slider registry --name prestoapp --getexp presto

# 销毁 applicaiton
bin/slider destroy prestoapp --force

# 删除 presto-yarn-package
bin/slider package --delete --name PRESTO

# 动态调整 applicaiton,增加或删除 WORKER 数
bin/slider flex prestoapp --component WORKER 2

Presto 配置文件存放目录

presto server 安装在yarn.nodemanager.local-dirs下面,在 appConfigs.json 中指定了日志及配置文件在/var/lib/presto/下面

至此,我们简要的提到了利用 haproxy 搭建 Hive 和 Hue,并且详细介绍了如何利用 Slider 将 Presto 搭建在 Yarn 集群中,接下来将介绍这些组件中的权限管理方案。

权限管理

Hue 账号体系

Hue 是一个开源的 Apache Hadoop UI 系统,它是基于 Python Web 框架 Django 实现的,Hue 在数据库方面,默认使用的是 SQLite 数据库来管理自身的数据,包括用户认证和授权,另外,可以自定义为 MySQL 数据库、Postgresql 数据库、以及 Oracle 数据库。在本文中我们使用了 MySQL 数据库来管理 Hue 和 Hive 的元数据。

在 Hue 源代码的hue/desktop/core/src/desktop/auth/views.py中利用 dt_login 函数对用户的登陆行为进行验证,为了将 hue 用户体系集成到 hive 和 presto 中,我们新增了 check_account 接口进行用户密码校验,通过可选参数 show_permissions 也可以返回该用户的对 hive 数据库的权限。

# hue/desktop/core/src/desktop/urls.py新增check_account接口
dynamic_patterns += patterns('desktop.auth.api',
  (r'^api/check_account/$', 'check_account'),
)
# 新建hue/desktop/core/src/auth/api.py实现check_account方法
@csrf_exempt
@login_notrequired
def check_account(request):
  response = {'status': -1, 'message': ''}
  backend_names = get_backend_names()
  is_active_directory = 'LdapBackend' in backend_names and ( bool(LDAP.NT_DOMAIN.get()) or bool(LDAP.LDAP_SERVERS.get()) )

  if is_active_directory:
    AuthenticationForm = auth_forms.LdapAuthenticationForm
  else:
    AuthenticationForm = auth_forms.AuthenticationForm

  if request.method == 'POST':
    auth_form = AuthenticationForm(data=request.POST)
    if auth_form.is_valid():
      response['status'] = 0
      if request.GET.get("show_permissions"):
        user = auth_form.get_user()
        
        from beeswax.server.dbms import get_query_server_config, HiveServer2Dbms
        from beeswax.server.hive_server2_lib import HiveServerClientCompatible, HiveServerClient
        from beeswax.models import QueryHistory
        from beeswax.design import hql_query
        
        hive_server = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(get_query_server_config(), user)), QueryHistory.SERVER_TYPE[1][0])
        handle = hive_server.execute_and_wait(hql_query('show role grant user %s;' % user.username), timeout_sec=30.0)
        if handle:
          result = hive_server.fetch(handle, rows=5000)
          hive_server.close(handle)
          response['message'] = list(result.rows())
        else:
          response['message'] = []
      else:
        response['message'] = _('Auth succeeded')
      return JsonResponse(response)
    else:
      response['status'] = -2
      response['message'] = auth_form.errors
      return JsonResponse(response)
  else:
    response['message'] = _('POST request only')
  return JsonResponse(response)

Hive 自定义超级用户

Hive 从 0.10 版本(包含 0.10 版本)以后可以通过元数据来控制权限,默认情况下,所有用户只要是指向同一个元数据,就具备相同的操作所有Hive表的权限,进而操作 HDFS,即超级管理员的权限。在 Cloudera Manager 配置hive-site.xml的 Hive 服务高级配置代码段,可以开启权限验证。

<property>
     <name>hive.security.authorization.enabled</name>
     <value>true</value>
     <description>enable or disable the hive client authorization</description>
</property>

<property>
     <name>hive.security.authorization.createtable.owner.grants</name>
     <value>ALL</value>
     <description>the privileges automatically granted to the owner whenever a table gets created. An example like"select,drop" will grant select and drop privilege to the owner of the table</description>
</property>

<property>
     <name>hive.security.authorization.task.factory</name>
     <value>org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl</value>
</property>

hive.security.authorization.enabled:是开启权限验证,默认为 false。
hive.security.authorization.createtable.owner.grants:是指表的创建者对表拥有所有权限,例如创建一个表 table1,这个用户对表 table1 拥有 SELECT、DROP 等操作。还有个值是 NULL,表示表的创建者无法访问该表,这个肯定是不合理的。

可以添加超级权限用户,限定只有某些用户可以执行重要指令,例如create database, grant role, revoke role等等。实现方式是实现 Hive 的扩展函数,本节中通过实现AbstractSemanticAnalyzerHook函数限定了只有 hive 用户为超级用户,抛出的 SemanticException 会出现在日志中或者 beeline 的界面中。

package cn.caijiajia.hivex.hook;

public class AuthHook extends AbstractSemanticAnalyzerHook {
    private static String adminUser = "hive";

    @Override
    public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
                              ASTNode ast) throws SemanticException {
        switch (ast.getToken().getType()) {
            case HiveParser.TOK_CREATEDATABASE:
            case HiveParser.TOK_DROPDATABASE:
            case HiveParser.TOK_CREATEROLE:
            case HiveParser.TOK_DROPROLE:
            case HiveParser.TOK_GRANT:
            case HiveParser.TOK_REVOKE:
            case HiveParser.TOK_GRANT_ROLE:
            case HiveParser.TOK_REVOKE_ROLE:
                String userName = null;
                if (SessionState.get() != null
                        && SessionState.get().getAuthenticator() != null) {
                    userName = SessionState.get().getAuthenticator().getUserName();
                }
                if (!adminUser.equalsIgnoreCase(userName)) {
                    throw new SemanticException("Permission denied. " + userName + " can't use ADMIN options.");
                }
                break;
            default:
                break;
        }
        return ast;
    }
}

同样在 hive-site.xml 中添加如下片段,指定自定义的 hook 扩展函数,注意修改其中的 package 为自己的名称。将 hivex 工程生成 jar 包放到/var/lib/hive中之后重启 hive 就可以生效。

<property>
     <name>hive.semantic.analyzer.hook</name>
     <value>cn.caijiajia.hivex.hook.AuthHook</value>
</property>

Hive 用户密码管理

一般来说访问 Hive 的方式包括 hue 服务连接、beeline 连接、jdbc 连接,但是在上文中只是校验了用户名称,并没有校验密码,因此还需要再写一个 hook 函数完成密码校验。本文提供的解决方案是集成 hue 的用户体系,扩展PasswdAuthenticationProvider函数,通过上节中扩展的 hue 接口 check_account 来校验密码。

package cn.caijiajia.hivex.hook;

public class CheckPasswordHook implements PasswdAuthenticationProvider {

    @Override
    public void Authenticate(String username, String password)
            throws AuthenticationException {
        try {
            Content content = Request.Post(LatteDataConfig.CHECK_ACCOUNT_HUE_API)
                    .addHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
                    .bodyForm(Form.form()
                            .add("username", username)
                            .add("password", password)
                            .build())
                    .execute().returnContent();

            JSONObject jObject = JSON.parseObject(content.toString());
            if (jObject.getInteger("status") != 0) {
                throw new AuthenticationException("Invalid password of user: " + username);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new AuthenticationException("Hue api error", e);
        }
    }
}

同样需要在hive-site.xml中指定自定义的扩展函数。

<property>
     <name>hive.server2.authentication</name>
     <value>CUSTOM</value>
</property>
<property>
     <name>hive.server2.custom.authentication.class</name>
     <value>cn.caijiajia.hivex.hook.CheckPasswordHook</value>
</property>

presto-jdbc 用户管理

Hadoop 集群中常用 Kerberos 来进行统一安全认证,Presto 中也支持 Kerberos,但是在公司实际应用环境中,数据分析人员主要在跳板机隔离环境中通过 DBeaver 工具来访问 Presto,所以在没有引入 Kerberos 的条件下我们可以简单的在 presto-jdbc 包中进行修改,集成 Hue 用户体系完成用户密码验证以及数据库的权限验证。

com.facebook.presto.jdbc.PrestoDriver的 connect 方法中新建了 PrestoDriverUri 实例 uri,因此在uri.setupClient(OkHttpClient.Builder builder)中在创建连接之前先从 hue 的接口中校验密码并且获取当前用户拥有权限的数据库列表 authSchemas,然后将 authSchemas 传递给 PrestoConnection 为提供给接下来的 SQL 解析校验权限使用。

  /**
     * 创建连接之前先校验用户密码,获取用户拥有权限的数据库列表
     * @param url
     * @param info
     * @return
     * @throws SQLException
     */
    @Override
    public Connection connect(String url, Properties info)
            throws SQLException
    {
        if (!acceptsURL(url)) {
            return null;
        }

        PrestoDriverUri uri = new PrestoDriverUri(url, info);

        OkHttpClient.Builder builder = httpClient.newBuilder();
        List<String> authSchemas = uri.setupClientAndGetAuthSchemas(builder);
        QueryExecutor executor = new QueryExecutor(builder.build());

        return new PrestoConnection(uri, executor, authSchemas);
    }

com.facebook.presto.jdbc.PrestoStatement的 execute 方法中执行 startQuery 方法之前可以新增一个校验方法,根据 authSchemas 来判断该用户执行的 SQL 语句是否包含了不合法的操作,例如访问了无权限的数据库。

Presto 数据查询引擎实际使用的是 Antlr 进行自定义的 SQL 解析,在com.facebook.presto.sql.parser.SqlParser中提供了 createStatement 方法,所以只需要将 SQL 传入解析器得到一个 Statement,根据 Statement 的详细情况遍历其节点及子节点并解析其内容就得到相关的数据表和列,解析过程中同样根据 AST 的路径重新生成了一棵树,通过遍历树就可以知道表别名及真实表的对应关系,以及字段和数据表的真实对应关系。

如下图所示中,在 Hue 中 Auth API 负责对外提供用户密码鉴权功能,同时也可以返回某个用户拥有权限的数据库列表。Stats 应用作为数据收集器,对外提供API负责接收并保存用户通过 Hive 或者 Presto 查询的具体 SQL 语句。

总结

本文简单介绍了大数据交互平台常用组件 Hive 和 Hue 以及 Presto 的搭建方案,重点介绍如何利用 Slider 搭建 Presto on Yarn 平台。另外介绍了如何将用户访问数据的权限体系与 Hue 保持统一,通过扩展 Hue 开放新的接口,实现 Hive 的 hook 函数来完成自定义用户管理,在公司的实际应用场景下修改 presto-jdbc 来简单的完成数据权限控制,如果要做到严格的 presto 权限控制可能需要修改 presto-server 代码。

另外,为了分析和统计哪些表被访问的更频繁一些,或者为了发现哪些表被创建之后实际上并没有人进行访问而只是浪费了资源,可以将数据分析人员每次提交的 SQL 以及所涉及到的表记录下来。在 presto-jdbc 中我们已经通过 presto 的解析器获取了语法树,可以直接记录下来。在 Hive 中可以实现 ExecuteWithHookContext 方法新加一个 hook 函数,hookContext 中可以直接获取当前用户使用的 SQL 语句 read 或者 write 了哪些表。

本文作者:Allen@张良慧

数据驱动力,驱动什么?如何驱动?

一个企业如果单独成立了数据团队,那么会期望这个团队给企业的发展带来什么呢?浅层次的可能是一些数据备份、数据安全方面的需求;深一点的,可能是数据清洗和整合,让散落在各个业务系统的数据都汇聚在一起,做成数仓,然后形成规律的报表;更高级的需求可能是期望这个团队具备数据分析、数据挖掘甚至是机器学习的能力,能够发现业务方自身难以发现的问题,或者是从海量的数据中挖掘出有用的信息供业务回炉使用。

从这三个层次的需求来看,前面两个是偏“数据支持”方面的,第三个才称得上是“数据驱动”。这两个概念并非互相独立,而是一种层次递进的关系。为了能更好的回答“数据能驱动什么以及如何驱动”,我们先来看看数据团队应该如何做好基本的数据支持工作,然后在这个基础上如何通过提高运转效率、组织创新来达到驱动业务发展的目的。

为了做好数据支持,先要明确数据团队在企业中基本职责是什么,然后找到与其他团队一起高效合作的工作方式,最后要根据企业的特质,迭代出符合当前企业发展需求的团队组织架构。

数据团队的基本职责,可以从四个方面来阐述基本工作内容。它们分别是数据平台、数据仓库、数据应用以及数据工具。

具体罗列开来就是下面这些。

  • 运维大数据集群,维护数据作业调度;
  • 在保证数据完整、准确、安全的前提下,
    • 做好企业全量数据的存储、备份、冷热分离;
    • 搭建和维护离线、准实时、实时数据流,满足业务方不同时效性的用数需求;
    • 开发数据仓库、整理数据字典,让任何量级、任何维度、任何用途的数据可查询,可导出;
    • 开发数据应用,让数据以有意义的方式展现和输出;
    • 运维数据访问工具,研究并推广新型数据处理、数据分析工具;

那么在开发数据仓库、数据应用的过程中,数据团队应该如何和其他团队协作工作的呢?一起来看看下面这张图。

这个图的重点(中心)揭示了数据团队是需要对业务方提供“快速响应”的数据服务的。基于这个环来提供数据支持服务的时候,容易在两个环节出现“掉链”。一个是“提出需求->接受需求”阶段。一个企业里面业务条线较多,市场、产品、财务、运营、商务等,他们提出来的数据需求往往都带有比较深的领域知识,而接受并消化这些需求的人一般是数据产品经理,这个岗位对人的要求非常高,也及其稀缺,领域知识的缺乏会影响需求的传递质量,需求的落地也会经常出现不符合预期的情况。另外一个阶段就是“指导业务->提出需求”。数据团队的任何产出,总是期望能给业务提供正向意义的。而实际情况往往是提供出去的数据应用或者报表,业务方看过之后并不会主动、及时的反馈是否真的有用,需要产品经理多次去寻求反馈,或者征求调整的建议。这个环节非常容易被忽略,导致这个环的运转效率变低。

接下来谈谈数据团队的组织架构。这个月(7月11日)清华大学校发布了国内首份《顶级数据团队建设全景报告》,里面提到了目前企业里面两种普遍存在的大数据团队组织结构。一种是金字塔型,一种是矩阵型,分别对应于下面两个图。

金字塔型在互联网科技行业较为常见,其特点是扁平化的管理,执行力强,沟通成本低,但缺点也很明显,就是团队离业务较远,容易出现产出不符合业务预期的情况。

矩阵型在传统行业比较常见,比如交通银行、电信企业等,这种的数据团队通常没有一个明确的数据负责人。数据岗位是分布在各个业务模块中的。每个小团队分别负责各个业务模块的不同数据需求。

两种组织结构各有优劣,其对比效果如下图所示:

看到这个结果以后,我们就需要思考,到底那种形式的组织架构适合企业当前的发展阶段呢?如果两个都不合适,有没有第三种折中的方案呢?答案是有的。可以将数据采集、存储、清洗、整合等底层数据工程面的工作放在数据小组,数据需求整理、数据分析、决策分析等偏业务的工作让业务部门专人负责,也就是让每条业务线都设立一个“数据产品经理”的职位,他们和数据团队的人员一起做项目,一来负责需求的分析和落地,二来负责专业的数据分析工作。另外各个数据产品经理还可以根据其自身的业务特色,主导并组织跨部门人员形成虚拟工作小组,重点突击,完成不同阶段的重点目标。具体形式如下图所示(DPM:数据产品经理):

图中有一个未知因素X(图中最左侧),这里需要一个强有力的角色来推动该组织形式的落地。因为会涉及跨部门甚至是跨分公司的协作,至少需要从企业高级决策层出发才能保障这种组织架构平稳落地并产生效能。

每个企业在不同的发展阶段,需要数据团队的产出不尽相同。企业成立初期,可能只需要数据团队搭建好数据平台,做好数据汇聚和备份的工作;在业务启动阶段,可能需要数据团队最好基本数仓开发,做好常规报表,基于数据监控业务发展动态等;在攻城拔寨阶段,可能需要数据团队在全面做好数仓的同时,还能提供数据分析能力,能产出有价值的报表,发现业务团队自身难以发现的变化趋势,亦或做出合理的预测,指导业务团队重点突击;在产品稳步增长阶段,可能需要数据团队做好存量用户分析,为用户经营提供策略支持,甚至还可能需要尝试增长黑客相关的尝试,通过大数据的技术红利来为获客提供不同寻常的思路。

回到标题的问题,企业里的数据团队到底能驱动什么?该如何驱动呢?能驱动什么,需要根据企业当前的发展阶段来制定目标,由企业经营决策层来拍板推动,比如说“数据驱动增长”,“数据驱动产品优化”,“数据驱动运营”等等。至于如何驱动,相信看过上面的数据团队的职责介绍、运转机制、组织架构优化,每个人都有自己的答案了吧。

数据可视化之图表选择指南

一提到“数据可视化”,人们的大脑立马会浮现出各式各样的图表。最基本的有散点图、柱状图、饼状图、折线图,复杂一点的有气泡图、K线图、雷达图、桑基图等,更加复杂还有热力图、南丁格尔玫瑰图、和弦图。每种图都有各自的适用场景,不同的数据,如何选择合适的图表来表达,是本文探讨的主要目的。

首先,我们来看看网络上能找到的一个比较全的图表类目信息。

 

上面罗列的所有图表,去重以后合计39种。其中K线图在趋势类、比较类和时间类都出现了,层叠面积图也在时间类、占比类以及区间类都出现了,说明同一种图表,可以在不同的场合用来传递不同的意义。如此复杂多样的图表,仅仅根据这个基本的分类来决策如何选用它们是远远不够的。在实际操作过程中,想要选出最合适的图表,除了要对这些图表的基本视觉特征都比较熟悉以外,还要反过来从数据本身的角度出发,对数据做定性和定量分析,然后结合表达意图,反复的去套用其中一种或多种,找出视觉效果最直接、实现最低的那一种。

无意中我发现美国的Andrew Abela博士在2006年发布了一篇博客“Choosing a good chart”,给出了“图表选择的终极指南”。其核心内容是一个思维导图,如下图所示:

下面我用白话文把这个思维导图翻译一遍,带着大家走一遍每个分支,看看在决策图表的时候到底需要考虑哪些因素。

第一步:问自己“想要展示什么”?答案会落入四个选项:“数据的对比”、“数据的分布”、“数据之间的关系”以及“数据的组成”。

第二步:定位到其中一个明确目的以后,继续深挖数据的特性以及逻辑关系。

如果是对比类,数据是基于“分类”的对比还是基于“时间”的对比?如果是基于“分类”的,每个分类需要对比的变量是一个还是多个?如果是多个,用“不等宽柱状图”;如果是一个,先看分类信息是否可以再分大类,大类较多的话用“内嵌柱状图的表格”,大类较少的话就用条形图或者柱状图(含堆积柱状图)。如果是基于“时间”来做对比的,要看是基于多周期的对比还是极少周期的对比。如果是多周期的, 循环数据用“雷达图”,非循环数据用“曲线图(折线图)”。基于少数周期的对比,分类较少的话用“柱状图”,分类多的话用“组合曲线图”。

如果是分布类,要看数据的维度,可分为“一维(单变量)”、“二维(双变量,XY轴)”和“三维(三变量,XYZ轴)”分布。单维度的分布,数据点较少的话用“直方图”,数据点很多就用“正太分布图”;二维的数据用“散点图”;三维的数据用“曲面图”。超过三维,无法可视化。

如果是数据组成类,要看数据是动态还是静态的。动态是指数据是随时间变化的。如果是静态数据,想看各类目在整体的占比用“饼状图”,想了解数据的求和或者差值与整体的关系用“瀑布图”,想看每个大类里面小类的占比,可以用“堆积百分比柱状图”。对于动态数据,周期少的话,仅看相对差异用“堆积百分比柱状图”,想同时看到绝对差异用“堆积柱状图(显示绝对值)”。周期多的,看相对差异用“堆积百分比面积图”,同时想看绝对差异用“堆积面积图”。

如果是数据关联类,看两个变量之间的关系用散点图,三个变量之间的关系用气泡图。

这份思维导图给我们提供了一种图表决策的思考模式。还有一些比较复杂的图表虽然没有涉及到,但是只要经过类似的推导(一系列的自问),总会找到合理的启发,最终找出最“心满意足”的图标。

下面这份思维导图的中文版。

关于图表的分类以及每种图表的详细介绍,大家可以前往蚂蚁金服推出的AntV官方网站做更深入的研究。另外,还有在线ChartChooser的网站,提供了各种图表的Excel/PowerPoint模板。

人类如何感知数据

数据,对于计算机而言,只是0和1的汇总,对于人类而言,它承载的更多的是事实。一份数据,所能承载的信息可以分为两类:“量化信息(Quantitavive)”和“分类信息(Categorical)”。人们在面对一份数据时,会不自觉的通过对比和推演,尝试找出数据之间的关系,或者数据变化的趋势,最终形成相应的结论。原始的数据,没有经过整理和转换,人们在“感知”的时候就会非常低效,最后也很难形成结论。所以数据想要被理解,被高效的理解,就需要重新Encode,然后以合适的形式展现出来,这就是数据可视化的过程。

数据可视化的目的是为了方便交流,是作者想要以最低的成本让观众理解数据背后的含义,最终在每个人的大脑里形成一致的观点。数据可视化是数据和艺术设计的结合体,好的作品是内容和美学平衡点。

那么人类是如何通过可视化作品来理解数据呢?下面这个图展示了整个流程,大致来讲是经历三个阶段:感知,解释以及理解。

WechatIMG883

在“感知”这个阶段,人们会尝试去阅读数据,阅读的目的为了弄清楚下面几个问题:

1, 展示了什么数据(分类信息)?
2, 数据量大不大,最小值,最大值,平均值等指标分别是多少(量化信息)?
3, 数据如何对比?
4, 数据之间的关系是什么?

在“解释”这个阶段,人们会结合背景信息以及提示文字,初步形成自己的结论:

1, 数据说明什么问题?
2, 揭示的问题是好还是坏?
3, 数据是否有意义,是否重要?
4, 数据是符合预期还是异常?

在“理解”阶段,人们会思考数据背后的含义,结合自身的业务、领域知识,会尝试去证明自己在上一个步骤对数据的解释是行得通的,然后形成最终的结论。

1, 数据对我(我的业务)有什么意义?
2, 作者想表达主要意图是什么?
3, 我通过数据学到了什么?
4, 我是否该采取行动?

数据可视化的核心永远是“对比”。不管借助何种图形,使用了多少表达手段,可视化的作者的终极目的是想让观众能对数据的某些方面进行“高效的对比”。通过对比不同种类的数据,能发现数据之间的关系;通过对比同类数据不同时间(阶段)的值,能发现数据的发展规律;通过对比,还能发现数据的分布状态、离散程度等。可视化作品,是以图的形式,让人通过视觉处理,使得任何一个维度的“对比”更加简单、清晰,甚至“更有趣”。

在了解了“人类是如何感知数据”以后,我们需要进一步挖掘,有哪些可视化的基本元素可以用来表达数据的“量化信息”和“分类信息”呢?有人可能会说,是不是选择“合适的可视化图表”就好了。答案是任何一种图表都是可视化表达的最终形式,但并不是基本元素。“分类信息”相对来说比较简单,一般通过颜色区分,加上文字标注就能非常精确的传达信息。“量化信息”的表达方式比较多样,早在1985年Cleveland和McGill发表的论文《Graphical Perception and Graphical Methods for Analyzing Scientific Data》就提出来,总共有7种基本元素(也可以称之为途径)来表达数据的量化信息,并且按照顺序从低到高,表达的误差越来越大。

1, Position along a common scale:位置(同一坐标系)
2, Position on identical but nonaligned scales: 位置(不同坐标系)
3, Length:长度
4, Angle or Slope:角度或者斜率
5, Area:面积
6, Volume or Density or Color saturation:体积、密度或者颜色的饱和度
7, Color hue:色彩

制作可视化图表过程,就是将数据转化成上述一种或者多种元素,然后优雅的组织在一起,方便读者以最快的速度、最愉悦的心情读懂作者想表达的意思。

人类感知数据的过程是一个非常复杂的过程。除了上面提到的“数据encode,视觉decode”,还涉及到人类是如何感知图片的。人类似乎天生就具备视觉解码的能力,只是每个人的能力水平不一样。有的人对色彩更敏感,有的人对模式识别更擅长,还有的人对预估(对趋势的预判)更加“直觉”。所谓“一图胜千言”,数据可视化正式借助了人类强大的视觉处理能力来达到“让数据说话”的目的。

Hello, DataXV

DataXV,我对它的定义如下:

Data – BigData, Data Platform;
X – Extreme, Excellent;
V – Valuable, Visualization.

数据是客观的,也有可能是杂乱的。如何让数据说话,体现数据的价值,是本站探索的终极目的。数据驱动增长,数据可视化,数据挖掘等等,有趣,有挑战,欢迎志同道合的人士来一起探讨、积累、成长。

投稿:wp[at]dataxv.io