标签归档:flink

携程基于Flink的实时特征平台

本文来自于flink-china,本文主要介绍原实时特征作业的开发痛点、特征平台系统架构以及选择Flink的原因等。

本文主要内容如下:

在公司实时特征开发的现状基础上,说明实时特征平台的开发背景、目标以及现状

选择Flink作为平台计算引擎的原因

Flink的实践:有代表性的使用示例、为兼容Aerospike(平台的存储介质)的开发以及碰到的坑

当前效果&未来规划

一、在公司实时特征开发的现状基础上,说明实时特征平台的开发背景、目标以及现状

1、原实时特征作业的开发运维

1.1、选择实时计算平台:依据项目的性能指标要求(latency,throughput等),在已有的实时计算平台:Storm Spark flink进行选择

1.2主要的开发运维过程:

80%以上的作业需要用到消息队列数据源,但是消息队列为非结构化数据且没有统一的数据字典。所以需要通过消费对应的topic,解析消息并确定所需的内容

基于需求中的场景,设计开发计算逻辑

在实时数据不能完全满足数据需求的情况,另外开发单独的离线作业以及融合逻辑;

例如:在需要30天数据的场景下,但消息队列中只有七天内的数据时(kafka中消息的默认保留时间),剩下23天就需要用离线数据来补充。

设计开发数据的校验和纠错逻辑 消息的传输需要依赖网络,消息丢失和超时难以完全避免,所以需要有一个校验和纠错的逻辑。

测试上线

监控和预警

2、原实时特征作业的开发痛点

消息队列数据源结构没有统一的数据字典

特征计算逻辑高度定制化,开发测试周期长

实时数据不能满足需求时,需要定制离线作业和融合逻辑

校验和纠错方案没有形成最佳实践,实际效果比较依赖个人能力

监控和预警方案需要基于业务逻辑定制

3、基于整理的痛点,确定下来的平台目标

实时数据字典:提供统一的数据源注册、管理功能,支持单一结构消息的topic和包含多种不同结构消息的topic

逻辑抽象:抽象为SQL,减少工作量&降低使用门槛

特征融合:提供融合特征的功能,解决实时特征不能完全满足数据需求的情况

数据校验和纠错:提供利用离线数据校验和纠错实时特征的功能

实时计算延迟:ms级

实时计算容错:端到端 exactly-once

统一的监控预警和HA方案

4、特征平台系统架构

现在的架构是标准lamda架构,离线部分由spark sql + dataX组成。现在使用的是KV存储系统Aerospike,跟redis的主要区别是使用SSD作为主存,我们压测下来大部分场景读写性能跟redis在同一个数据量级。

实时部分:使用flink作为计算引擎,介绍一下用户的使用方式:

注册数据源:目前支持的实时数据源主要是Kafka和Aerospike,其中Aerospike中的数据如果是在平台上配置的离线或者实时特征,会进行自动注册。Kafka数据源需要上传对应的schemaSample文件

计算逻辑:通过SQL表达

定义输出:定义输出的Aerospike表和可能需要的Kafka Topic,用于推送Update或者Insert的数据的key

用户完成上面的操作后,平台将所有信息写入到json配置文件。下一步平台将配置文件和之前准备好的flinkTemplate.jar(包含所有平台所需的flink功能)提交给yarn,启动flink job。

5、平台功能展示

1)平台功能展示-数据源注册

2)实时特征编辑-基本信息

3)实时特征编辑-数据源选择

4)实时特征编辑-SQL计算

5)实时特征编辑-选择输出

二、选择Flink的原因

我们下面一个我们说一下我们选择flink来做这个特征平台的原因。

分为三个维度:最高延迟、容错、sql功能成熟度

延迟:storm和flink是纯流式,最低可以达到毫秒级的延迟。spark的纯流式机制是continuous模式,也可以达最低毫秒级的延迟

容错:storm使用异或ack的模式,支持atLeastOnce。消息重复解决不。spark通过checkpoint和WAL来提供exactlyOnce。flink通过checkpoint和SavePoint来做到exactlyOnce。

sql成熟度:storm现在的版本中SQL还在一个实验阶段,不支持聚合和join。spark现在可以提供绝大部分功能,不支持distinct、limit和聚合结果的order by。flink现在社区版中提供的sql,不支持distinct aggregate

三、Flink实践

1、实例

2、兼容开发:flink现在没有对Aerospike提供读写支持,所以需要二次开发

3、碰到的坑

四、平台当前效果&未来规划

当前效果:将实时特征上线周期从原平均3天-5天降至小时级。未来规划:

完善特征平台的功能:融合特征等

简化步骤,提高用户体验

根据需求,进一步完善SQL的功能例如支持win的开始时间offset,可以通过countTrigger的win等

下一步的规划是通过sql或者DSL来描述模型部署和模型训练

来源:http://www.uml.org.cn/bigdata/201811223.asp

实时计算 Flink SQL 核心功能解密

实时计算 Flink SQL 核心功能解密

Flink SQL 是于2017年7月开始面向集团开放流计算服务的。虽然是一个非常年轻的产品,但是到双11期间已经支撑了数千个作业,在双11期间,Blink 作业的处理峰值达到了5+亿每秒,而其中仅 Flink SQL 作业的处理总峰值就达到了3亿/秒。Flink SQL 在这么短的时间内支撑了如此多的业务,与其稳定的内核、完善的功能、强大的生态是分不开的。

本文会带着大家一起来揭开 Flink SQL 核心功能的面纱(API上我们将尽可能的和Flink社区保持一致,这样才能够更好的融入开源的生态,所以我们将API叫做Flink SQL,而不是Blink SQL。事实上flink社区的SQL绝大部分是我们阿里的工程师贡献的:3个 Flink Committer,10+ Contributor,贡献 80% 的SQL 功能,近200个 commit,近十万行的代码)。

为什么是 SQL?

Blink 将 SQL 定位为其最核心的 API。为什么是 SQL 而不是 DataStream API 呢?因为 SQL 具有以下几个优点:

声明式。用户只需要表达我想要什么,至于怎么计算那是系统的事情,用户不用关心。

自动调优。查询优化器可以为用户的 SQL 生成最有的执行计划。用户不需要了解它,就能自动享受优化器带来的性能提升。

易于理解。很多不同行业不同领域的人都懂 SQL,SQL 的学习门槛很低,用 SQL 作为跨团队的开发语言可以很大地提高效率。

稳定。SQL 是一个拥有几十年历史的语言,是一个非常稳定的语言,很少有变动。所以当我们升级引擎的版本时,甚至替换成另一个引擎,都可以做到兼容地、平滑地升级。

流与批的统一。Blink 底层 runtime 本身就是一个流与批统一的引擎。而 SQL 可以做到 API 层的流与批统一。

我们认为这 5 点对于用户的易用性是非常重要的,而以上 5 点却是 DataStream API 所不具备的。所以 Blink 将 SQL 定位为最核心的 API,而不是 DataStream API。

关于流与批的统一是现在业界非常火热的一个话题,Flink SQL 的流与批统一总结起来就一句话:One Query, One Result。在很多场景,我们既需要批处理,又需要流处理。比如,使用批处理一天跑一个全量,同时使用流处理来做实时的增量更新。在以前经常需要维护两套引擎,写两个 Job,两个 Job 之间还要维护逻辑的一致性,这增加了很多的工作量。如果使用 SQL 的话,我们可以让一份 SQL 代码既跑在批模式下,又跑在流模式下,这样用户只需要维护一份 SQL 代码,这是One Query。而One Result是说,同一份 SQL 代码,在流模式下和批模式下跑出来的结果是一样的,也就是保证了流式 SQL 的语义正确性。

我们注意到 SQL 是为传统批处理设计的,不是为流处理设计的。比如说传统 SQL处理的数据是有限的,而且SQL查询只返回一个结果并结束。但是流上的查询,处理的数据是无限的,不断产生结果且不会结束。所以说传统 SQL 标准中很多定义无法直接映射到流计算中。那么如何在流上定义 SQL 呢?这里需要引出 Flink SQL 的核心概念:流与表的二象性。

Flink SQL 核心概念

动态表 & 流表二象性

传统的 SQL 是定义在表上的,为了能在流上定义 SQL,我们也需要有一个表的概念。这里就需要引入一个非常重要的概念:动态表(Dynamic Table)。所谓动态表,就是数据会随着时间变化的表,可以想象成就是数据库中一张被不断更新的表。我们发现流与表有非常紧密的关系,流可以看做动态表,动态表可以看做流。我们称之为流表二象性(duality)。

如上图所示,一个流可以看做对表的一系列更新操作(changelog),将流从头开始重放就可以构造成一个动态表。而动态表的每次更新操作都会记录下 changelog,通过抽取出动态表的 changelog 可以很轻松地得到原始的数据流(类似的思想也被应用于数据库同步中,如集团的DRC产品)。因此流可以转换成动态表,动态表又能转成流,他们之间的转换不会丢失任何信息,且保留了一致的 schema。流是动态表的另一种表现形式,动态表也是流的另一种表现形式,所以说流与表是一种二象性的关系。

连续查询

上文说到动态表是流的另一种表现形式,有了动态表后,我们就可以在流上定义 SQL 了。流式 SQL 可以想象成连续查询(Continuous Query)。传统的查询是只运行一次 SQL,产生一个结果就结束了。连续查询会一直运行在那里,当每个数据到来,都会持续增量地更新计算结果,从而产生另一个动态表。而这个结果动态表(也就是流)会作为另一个 SQL(连续查询)的输入接着计算,从而串起整个数据流图。

Flink SQL 核心功能

从 2016 年到 2017 年,Flink SQL 从无到有,迅速发展,解决多个 Stream SQL 领域的难点痛点,快速支持业务的需求。终于在今年的双11,Flink SQL 支撑了大量的双11业务,这与其丰富的上下游系统、完善的功能是离不开的,包括双流 JOIN,维表 JOIN,TopN,Window,多路输出等等。

打通集团上下游系统

Flink SQL 接入了集团内常见的十多种上下游系统,包括了11种结果表插件、5种源表插件、4种维表插件。只需要声明对接系统的类型,就能完成上下游系统的连接,将你从阿里云存储五花八门的 SDK 中解放出来。详见 《Flink SQL 功能解密系列 —— 阿里云流计算/Blink支持的connectors》

高级功能

双流 JOIN

双流 JOIN 功能是将两条流进行关联,用来补齐流上的字段。双流 JOIN 又分为无限流的双流 JOIN 和带窗口的双流 JOIN。

维表 JOIN

维表 JOIN 功能是流与表的关联,也是用来为数据流补齐字段,只是补齐的维度字段是在外部存储的维表中的。我们为维表 JOIN 做了诸如 Async、cache、multi-join-merge 等优化,使得维表 JOIN 的性能非常优异。具体原理分析和最佳实践可以阅读 《Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化》

TopN

TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。除了全局 TopN 功能外,我们还提供了分组 TopN 的功能。流上的 TopN 有非常多的挑战。具体原理分析和实践推荐阅读 《Flink SQL 功能解密系列 —— 流式 TopN 的挑战与实现》

Window

Flink SQL 简单易用的一大特色就是支持开箱即用的 Window 功能。支持滚动窗口(Tumble)、滑动窗口(Hop)、会话窗口(Session)以及传统数据库中的OVER窗口。具体使用方式可以阅读《Window 文档》

多路输入、多路输出

Flink SQL 利用分段优化支持了多路输出,并且多路输出的共享节点做到了资源的复用,使得不会计算多次。基于多路输入、多路输出的功能,可以将 Flink SQL 作为一个非常简单易用的画数据流的工具,可以很容易地构造出一个有流合并、流拆分的复杂 DAG 作业。

MiniBatch 优化

除此之外,我们还在 SQL 上做了很多的优化。其中 MiniBatch 就是核心优化之一。对于有状态的算子来说,每个进入算子的元素都需要对状态做序列化/反序列化的操作,频繁的状态序列化/反序列化操作占了性能开销的大半。MiniBatch 的核心思想是,对进入算子的元素进行攒批,一批数据只需要对状态序列化/反序列化一次即可,极大地提升了性能。

Retraction 撤回机制

撤回机制是 Flink SQL 中一个非常重要的基石,它解决了 early-fire 导致的结果正确性问题(所有的 GroupBy 都是 early-fire 的)。而利用好撤回机制有时候能够很巧妙地帮助业务解决一些特殊需求。详细的业务应用分析推荐阅读 《Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析》

业务支持情况

借助于阿里云一站式开发平台,用户可以高效地开发 Flink SQL 作业,是业务上线与业务迁移的加速器。目前 Flink SQL 在集团内部已经服务于 双11回血红包、聚划算、飞猪、菜鸟、盒马、云零售、反作弊等数十个业务场景,二十多个 BU,并成功经历双11大促的考验。在双11当天,Flink SQL 的作业更是创下了每秒2.9亿条的处理高峰。为各个业务取得了非常好的效果提供了非常坚实的保障。

阅读原文

【阅读原文...】

Flink原理与实现架构和拓扑概览

架构

要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是Flink集群启动后架构图。

当Flink集群启动后,首先会启动一个JobManger和一个或多个的TaskManager。由客户端提交任务给JobManager,JobManager再调度任务到各个TaskManager去执行,然后TaskManager将心跳和统计信息汇报给JobManager.TaskManager之间以流的形式进行数据的传输。上述三者均为独立的JVM进程。

  • 客户为提交作业的客户端,可以是运行在任何机器上(与JobManager环境连通即可)。提交作业后,客户可以结束进程(流的任务),也可以不结束并等待结果返回。
  • JobManager主要负责调度工作并协调任务做检查点,职责上很像Storm的Nimbus。从客户处接收到工作和JAR包等资源后,会生成优化后的执行计划,并以任务的单元调度到各个TaskManager去执行。
  • TaskManager在启动的时候就设置好了槽位数(Slot),每个插槽能启动一个任务,任务为线程。从JobManager处理接收需要部署的任务,部署启动后,与自己的上游建立Netty连接,接收数据并处理。

可以看到Flink的任务调度是多线程模型,并且不同Job / Task混合在一个TaskManager进程中。虽然这种方式可以有效提高CPU利用率,但是个人不太喜欢这种设计,因为不仅缺少资源隔离机制,同时也不方便调试。类似Storm的进程模型,一个JVM中只跑该Jobs Tasks实际应用中更为合理。

工作例子

本文所示例子为flink-1.0.x版本

我们使用Flink自带的例子包中的SocketTextStreamWordCount,这是一个从socket流中统计单词出现次数的例子。

首先,使用netcat的启动本地服务器:

$ nc -l 9000

然后提交Flink程序

$ bin / flink运行示例/ streaming / SocketTextStreamWordCount.jar \
  --hostname 10.218.130.9 \
  -  9000

在netcat端输入单词并监控taskmanager的输出可以看到单词统计的结果。

SocketTextStreamWordCount 的具体代码如下:

public  static  void  main (String [] args) throws Exception {
  //检查输入
  最终 ParameterTool PARAMS = ParameterTool.fromArgs(参数);
  ...
  //设置执行环境
  最终 StreamExecutionEnvironment ENV = StreamExecutionEnvironment.getExecutionEnvironment();
  //获取输入数据
  DataStream <String> text =
      env.socketTextStream(params.get(“hostname”),params.getInt(“port”),'\ n',0);
  DataStream <Tuple2 <String,Integer >>计数=
      //分开成对的行(2元组),包含:(word,1)
      text.flatMap(new Tokenizer())
          //由元组字段“0”组合,并将元组字段“1”
          .keyBy(0)
          .sum(1);
  counts.print();
  
  //执行程序
  env.execute(“SocketTextStream示例中的WordCount”);
}

我们将最后一行代码替换env.execute成System.out.println(env.getExecutionPlan());并并在本地运行该代码(并发度设为2),可以得到该拓扑的逻辑执行计划图的JSON串,将JSON串粘贴到http://flink.apache.org/可视化/中,能可视化该执行图。

但是并不是最终在Flink中运行的执行图,只是一个表示拓扑节点关系的计划图,在Flink中对应了SteramGraph。另外,提交拓扑后(并发度设为2)还能在UI中看到另一张执行计划图,如下所示,该图对应了Flink中的JobGraph。

图形

看起来有点乱,怎么有这么多不一样的图。实际上,还有更多的图.Flink中的执行图可以分成四层:StreamGraph - > JobGraph - > ExecutionGraph - >物理执行图。

  • StreamGraph:是根据用户通过Stream API编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph: StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。主要的优化为,将多个符合条件的节点链在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图: JobManager根据ExecutionGraph对工作进行调度后,在各个TaskManager上部署任务后形成的“图”,并不是一个具体的数据结构。

例如上文中的2个并发度(来源为1个并发度)的SocketTextStreamWordCount四层执行图产品的演变过程如下图产品所示(点击查看大图):

这里对一些名词进行简单的解释。

  • StreamGraph:根据用户通过Stream API 编写的代码生成的最初的图。
    • StreamNode:用来代表运算符的类,并具有所有相关的属性,如并发度,入边和出边等。
    • StreamEdge:表示连接两个StreamNode的边。
  • JobGraph: StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。
    • JobVertex:经过优化后符合条件的多个StreamNode可能会连锁在一起生成一个JobVertex,即一个JobVertex包含一个或多个运营商,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
    • IntermediateDataSet:表示JobVertex的输出,即经过运营商处理产生的数据集.producer是JobVertex,消费者是JobEdge。
    • JobEdge:代表了工作图中的一条数据传输通道.source是IntermediateDataSet,目标是JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
  • ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
    • ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的ExecutionVertex。
    • ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
    • IntermediateResult:和JobGraph中的IntermediateDataSet一一对应每一个IntermediateResult有与下游ExecutionJobVertex相同并发数的IntermediateResultPartition。
    • IntermediateResultPartition:表示ExecutionVertex的一个输出分区,制片人是ExecutionVertex,消费者是若干个ExecutionEdge。
    • ExecutionEdge:表示ExecutionVertex的输入,源是IntermediateResultPartition,目标是ExecutionVertex.source和目标都只能是一个。
    • 执行:执行一个ExecutionVertex的一次尝试。当发生故障或者数据需要重算的情况下ExecutionVertex可能会有多个ExecutionAttemptID。一个执行通过ExecutionAttemptID来唯一标识.JM和TM之间关于任务的部署和任务状态更新都是通过ExecutionAttemptID来确定消息接受者。
  • 物理执行图: JobManager根据ExecutionGraph对工作进行调度后,在各个TaskManager上部署任务后形成的“图”,并不是一个具体的数据结构。
    • 任务:执行被调度后在分配的TaskManager中启动对应的Task.Task包裹了具有用户执行逻辑的运算符。
    • ResultPartition:代表由一个任务的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
    • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费任务数和DistributionPattern来决定。
    • InputGate:代表任务的输入封装,和JobGraph中JobEdge一一对应每个InputGate消费了一个或多个的ResultPartition。
    • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。

那么Flink为什么要设计这4张图呢,其目的是什么呢?Spark中也有多张图,数据依赖图以及物理执行的DAG。其目的都是一样的,就是解耦,每张图各司其职位,每张图对应了工作不同的阶段,更方便做该阶段的事情。我们给出更完整的Flink Graph的层次图。

首先我们看到,JobGraph之上除了StreamGraph还有OptimizedPlan.OptimizedPlan是由Batch API转换而来的.StreamGraph是由Stream API转换而来的。为什么API不直接转换成JobGraph?因为,Batch和Stream的图结构和优化方法有很大的区别,比如批量有很多执行前的预分析用来优化图的执行,而这种优化并不适合流,所以通过OptimizedPlan来做批量的优化会更方便和清晰,也不会影响Stream.JobGraph的责任就是统一Batch和Stream的图,用来描述清楚一个拓扑图的结构,并且做了chaining的优化,chaining是普适于Batch和Stream的,所以在这一层做掉.ExecutionGraph的责任是方便调度和各个任务状态的监控和跟踪,所以ExecutionGraph是并行化的JobGraph。而“物理执行图”就是最终分布在各个机器上运行着的任务了。所以可以看到,这种解耦方式极大地方便了我们在各个 所做的工作,各个层之间是相互隔离的。

后续的文章,将会详细介绍Flink是如何生成这些执行图的。由于我目前关注Flink的流程处理功能,所以主要有以下内容:

如何生成StreamGraph

  1. 如何生成JobGraph
  2. 如何生成ExecutionGraph
  3. 如何进行调度(如何生成物理执行图)

 

来源: https://zhuanlan.zhihu.com/p/27576821

Flink 零基础实战教程:如何计算实时热门商品

在上一篇入门教程中,我们已经能够快速构建一个基础的 Flink 程序了。本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品。在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿用上文的my-flink-project项目框架。

通过本文你将学到:

  1. 如何基于 EventTime 处理,如何指定 Watermark
  2. 如何使用 Flink 灵活的 Window API
  3. 何时需要用到 State,以及如何使用
  4. 如何使用 ProcessFunction 实现 TopN 功能

 

实战案例介绍

本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前N名的商品

数据准备

这里我们准备了一份淘宝用户行为数据集(来自阿里云天池公开数据集,特别感谢)。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、加购、收藏)。数据集的组织形式和MovieLens-20M类似,即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

列名称说明
用户ID整数类型,加密后的用户ID
商品ID整数类型,加密后的商品ID
商品类目ID整数类型,加密后的商品所属类目ID
行为类型字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
时间戳行为发生的时间戳,单位秒

你可以通过下面的命令下载数据集到项目的 resources 目录下:

$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv

这里是否使用 curl 命令下载数据并不重要,你也可以使用 wget 命令或者直接访问链接下载数据。关键是,将数据文件保存到项目的 resources 目录下,方便应用程序访问。

编写程序

在 src/main/java/myflink 下创建 HotItems.java 文件:

package myflink;

public class HotItems {

  public static void main(String[] args) throws Exception {
    
  }
}

与上文一样,我们会一步步往里面填充代码。第一步仍然是创建一个 StreamExecutionEnvironment,我们把它添加到 main 函数中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了打印到控制台的结果不乱序,我们配置全局的并发为1,这里改变并发对结果正确性没有影响
env.setParallelism(1);

创建模拟数据源

在数据准备章节,我们已经将测试的数据集下载到本地了。由于是一个csv文件,我们将使用 CsvInputFormat 创建模拟数据源。

注:虽然一个流式应用应该是一个一直运行着的程序,需要消费一个无限数据源。但是在本案例教程中,为了省去构建真实数据源的繁琐,我们使用了文件来模拟真实数据源,这并不影响下文要介绍的知识点。这也是一种本地验证 Flink 应用程序正确性的常用方式。

我们先创建一个 UserBehavior 的 POJO 类(所有成员变量声明成public便是POJO类),强类型化后能方便后续的处理。

/** 用户行为数据结构 **/
public static class UserBehavior {
  public long userId;         // 用户ID
  public long itemId;         // 商品ID
  public int categoryId;      // 商品类目ID
  public String behavior;     // 用户行为, 包括("pv", "buy", "cart", "fav")
  public long timestamp;      // 行为发生的时间戳,单位秒
}

接下来我们就可以创建一个 PojoCsvInputFormat 了, 这是一个读取 csv 文件并将每一行转成指定 POJO
类型(在我们案例中是 UserBehavior)的输入器。

// UserBehavior.csv 的本地文件路径
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 创建 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);

下一步我们用 PojoCsvInputFormat 创建输入源。

DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);

这就创建了一个 UserBehavior 类型的 DataStream

EventTime 与 Watermark

当我们说“统计过去一小时内点击量”,这里的“一小时”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用户决定。

  • ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。
  • EventTime:事件发生的时间。一般就是数据本身携带的时间。

在本案例中,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做。

第一件是告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。

注:真实业务场景一般都是存在乱序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor

DataStream<UserBehavior> timedData = dataSource
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
      @Override
      public long extractAscendingTimestamp(UserBehavior userBehavior) {
        // 原始数据单位秒,将其转成毫秒
        return userBehavior.timestamp * 1000;
      }
    });

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、加购、购买、收藏各种行为的数据,但是我们只需要统计点击量,所以先使用 FilterFunction 将点击行为数据过滤出来。

DataStream<UserBehavior> pvData = timedData
    .filter(new FilterFunction<UserBehavior>() {
      @Override
      public boolean filter(UserBehavior userBehavior) throws Exception {
        // 过滤出只有点击的数据
        return userBehavior.behavior.equals("pv");
      }
    });

窗口统计点击量

由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

DataStream<ItemViewCount> windowedData = pvData
    .keyBy("itemId")
    .timeWindow(Time.minutes(60), Time.minutes(5))
    .aggregate(new CountAgg(), new WindowResultFunction());

我们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一起计算要高效地多。aggregate()方法的第一个参数用于

这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

  @Override
  public Long createAccumulator() {
    return 0L;
  }

  @Override
  public Long add(UserBehavior userBehavior, Long acc) {
    return acc + 1;
  }

  @Override
  public Long getResult(Long acc) {
    return acc;
  }

  @Override
  public Long merge(Long acc1, Long acc2) {
    return acc1 + acc2;
  }
}

.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出。

/** 用于输出窗口的结果 */
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

  @Override
  public void apply(
      Tuple key,  // 窗口的主键,即 itemId
      TimeWindow window,  // 窗口
      Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
      Collector<ItemViewCount> collector  // 输出类型为 ItemViewCount
  ) throws Exception {
    Long itemId = ((Tuple1<Long>) key).f0;
    Long count = aggregateResult.iterator().next();
    collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
  }
}

/** 商品点击量(窗口操作的输出类型) */
public static class ItemViewCount {
  public long itemId;     // 商品ID
  public long windowEnd;  // 窗口结束时间戳
  public long viewCount;  // 商品的点击量

  public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
    ItemViewCount result = new ItemViewCount();
    result.itemId = itemId;
    result.windowEnd = windowEnd;
    result.viewCount = viewCount;
    return result;
  }
}

现在我们得到了每个商品在每个窗口的点击量的数据流。

TopN 计算最热门商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。

DataStream<String> topItems = windowedData
    .keyBy("windowEnd")
    .process(new TopNHotItems(3));  // 求点击量前3名的商品

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,

在 processElement 方法中,每当收到一条数据(ItemViewCount),我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了windowEnd+1的 Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了 ListState<ItemViewCount> 来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

  private final int topSize;

  public TopNHotItems(int topSize) {
    this.topSize = topSize;
  }

  // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
  private ListState<ItemViewCount> itemState;

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    // 状态的注册
    ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
        "itemState-state",
        ItemViewCount.class);
    itemState = getRuntimeContext().getListState(itemsStateDesc);
  }

  @Override
  public void processElement(
      ItemViewCount input,
      Context context,
      Collector<String> collector) throws Exception {

    // 每条数据都保存到状态中
    itemState.add(input);
    // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
    context.timerService().registerEventTimeTimer(input.windowEnd + 1);
  }

  @Override
  public void onTimer(
      long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    // 获取收到的所有商品点击量
    List<ItemViewCount> allItems = new ArrayList<>();
    for (ItemViewCount item : itemState.get()) {
      allItems.add(item);
    }
    // 提前清除状态中的数据,释放空间
    itemState.clear();
    // 按照点击量从大到小排序
    allItems.sort(new Comparator<ItemViewCount>() {
      @Override
      public int compare(ItemViewCount o1, ItemViewCount o2) {
        return (int) (o2.viewCount - o1.viewCount);
      }
    });
    // 将排名信息格式化成 String, 便于打印
    StringBuilder result = new StringBuilder();
    result.append("====================================\n");
    result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
    for (int i=0;i<topSize;i++) {
      ItemViewCount currentItem = allItems.get(i);
      // No1:  商品ID=12224  浏览量=2413
      result.append("No").append(i).append(":")
            .append("  商品ID=").append(currentItem.itemId)
            .append("  浏览量=").append(currentItem.viewCount)
            .append("\n");
    }
    result.append("====================================\n\n");

    out.collect(result.toString());
  }
}

打印输出

最后一步我们将结果打印输出到控制台,并调用env.execute执行任务。

topItems.print();
env.execute("Hot Items Job");

运行程序

直接运行 main 函数,就能看到不断输出的每个时间点的热门商品ID。

总结

本文的完整代码可以通过 GitHub 访问到。本文通过实现一个“实时热门商品”的案例,学习和实践了 Flink 的多个核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的实现。希望本文能加深大家对 Flink 的理解,帮助大家解决实战上遇到的问题。

 

来源: https://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

Flink 原理与实现:Table & SQL API

Flink 已经拥有了强大的 DataStream/DataSet API,可以基本满足流计算和批计算中的所有需求。为什么还需要 Table & SQL API 呢?

首先 Table API 是一种关系型API,类 SQL 的API,用户可以像操作表一样地操作数据,非常的直观和方便。用户只需要说需要什么东西,系统就会自动地帮你决定如何最高效地计算它,而不需要像 DataStream 一样写一大堆 Function,优化还得纯靠手工调优。另外,SQL 作为一个“人所皆知”的语言,如果一个引擎提供 SQL,它将很容易被人们接受。这已经是业界很常见的现象了。值得学习的是,Flink 的 Table API 与 SQL API 的实现,有 80% 的代码是共用的。所以当我们讨论 Table API 时,常常是指 Table & SQL API。

Table & SQL API 还有另一个职责,就是流处理和批处理统一的API层。Flink 在runtime层是统一的,因为Flink将批任务看做流的一种特例来执行,这也是 Flink 向外鼓吹的一点。然而在编程模型上,Flink 却为批和流提供了两套API (DataSet 和 DataStream)。为什么 runtime 统一,而编程模型不统一呢? 在我看来,这是本末倒置的事情。用户才不管你 runtime 层是否统一,用户更关心的是写一套代码。这也是为什么现在 Apache Beam 能这么火的原因。所以 Table & SQL API 就扛起了统一API的大旗,批上的查询会随着输入数据的结束而结束并生成有限结果集,流上的查询会一直运行并生成结果流。Table & SQL API 做到了批与流上的查询具有同样的语法,因此不用改代码就能同时在批和流上跑。

聊聊历史

Table API 始于 Flink 0.9,Flink 0.9 是一个类库百花齐放的版本,众所周知的 Table API, Gelly, FlinkML 都是在这个版本加进去的。Flink 0.9 大概是在2015年6月正式发布的,在 Flink 0.9 发布之前,社区对 SQL 展开过好几次争论,不过当时社区认为应该首先完善 Table API 的功能,再去搞SQL,如果两头一起搞很容易什么都做不好。而且在整个Hadoop生态圈中已经有大量的所谓 “SQL-on-Hadoop” 的解决方案,譬如 Apache Hive , Apache Drill , Apache Impala 。”SQL-on-Flink”的事情也可以像 Hadoop 一样丢给其他社区去搞。

不过,随着 Flink 0.9 的发布,意味着抽象语法树、代码生成、运行时函数等都已经成熟,这为SQL的集成铺好了前进道路。另一方面,用户对 SQL 的呼声越来越高。2015年下半年,Timo 大神也加入了 dataArtisans,于是对Table API的改造开始了。2016 年初的时候,改造基本上完成了。我们也是在这个时间点发现了 Table API 的潜力,并加入了社区。经过这一年的努力,Flink 已经发展成 Apache 中最火热的项目之一,而 Flink 中最活跃的类库目前非 Table API 莫属。这其中离不开国内公司的支持,Table API 的贡献者绝大多数都来自于阿里巴巴和华为,并且主导着 Table API 的发展方向,这是非常令国人自豪的。而我在社区贡献了一年后,幸运地成为了 Flink Committer。

Table API & SQL 长什么样?

这里不会详细介绍 Table API & SQL 的使用,只是做一个展示。更多使用细节方面的问题请访问 官网文档 。

下面这个例子展示了如何用 Table API 处理温度传感器数据。计算每天每个以 room 开头的location的平均温度。例子中涉及了如何使用window,event-time等。

val sensorData: DataStream[(String, Long, Double)] = ???

// convert DataSet into Table
val sensorTable: Table = sensorData
 .toTable(tableEnv, 'location, 'time, 'tempF)

// define query on Table
val avgTempCTable: Table = sensorTable 
 .window(Tumble over 1.day on 'rowtime as 'w) 
 .groupBy('location, 'w)
 .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC)
 .where('location like "room%")

下面的例子是展示了如何用 SQL 来实现。

val sensorData: DataStream[(String, Long, Double)] = ???

// register DataStream
tableEnv.registerDataStream("sensorData", sensorData, 'location, ’time, 'tempF)

// query registered Table
val avgTempCTable: Table = tableEnv.sql("""
 SELECT FLOOR(rowtime() TO DAY) AS day, location, 
 AVG((tempF - 32) * 0.556) AS avgTempC
 FROM sensorData
 WHERE location LIKE 'room%'
 GROUP BY location, FLOOR(rowtime() TO DAY) """)

Table API & SQL 原理

Flink 非常明智,没有像Spark那样重复造轮子(Spark Catalyst),而是将 SQL 校验、SQL 解析以及 SQL 优化交给了 Apache Calcite 。Calcite 在其他很多开源项目里也都应用到了,譬如Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架构中处于核心的地位,如下图所示。

新的架构中,构建抽象语法树的事情全部交给了 Calcite 去做。SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。

以上面的温度计代码为样例,Table API 和 SQL 的转换流程如下,绿色的节点代表 Flink Table Nodes,蓝色的节点代表 Calcite Logical Nodes。最终都转化成了相同的 Logical Plan 表现形式。

之后会进入优化器,Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),另一类是是将Logical Node转变成 Flink Node 的规则。这两类规则的应用体现为下图中的①和②步骤,这两步骤都属于 Calcite 的优化阶段。得到的 DataStream Plan 封装了如何将节点翻译成对应 DataStream/DataSet 程序的逻辑。步骤③就是将不同的 DataStream/DataSet Node 通过代码生成(CodeGen)翻译成最终可执行的 DataStream/DataSet 程序。

代码生成是 Table API & SQL 中最核心的一块内容。表达式、条件、内置函数等等是需要CodeGen出具体的Function 代码的,这部分跟Spark SQL的结构很相似。CodeGen 出的Function以字符串的形式存在。在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。

Table API & SQL 现状

目前 Table API 对于批和流都已经支持了基本的Selection, Projection, Union,以及 Window 操作(包括固定窗口、滑动窗口、会话窗口)。SQL 的话由于 Calcite 在最近的版本中才支持 Window 语法,所以目前 Flink SQL 还不支持 Window 的语法。并且 Table API 和 SQL 都支持了UDF,UDTF,UDAF(开发中)。

Table API & SQL 未来

  1. Dynamic TablesDynamic Table 就是传统意义上的表,只不过表中的数据是会变化更新的。Flink 提出 Stream <–> Dynamic Table 之间是可以等价转换的。不过这需要引入Retraction机制。有机会的话,我会专门写一篇文章来介绍。
  2. Joins包括了支持流与流的 Join,以及流与表的 Join。
  3. SQL 客户端目前 SQL 是需要内嵌到 Java/Scala 代码中运行的,不是纯 SQL 的使用方式。未来需要支持 SQL 客户端执行提交 SQL 纯文本运行任务。
  4. 并行度设置目前 Table API & SQL 是无法设置并行度的,这使得 Table API 看起来仍像个玩具。

在我看来,Flink 的 Table & SQL API 是走在时代前沿的,在很多方面在做着定义业界标准的事情,比如 SQL 上Window的表达,时间语义的表达,流和批语义的统一等。在我看来,SQL 拥有更天然的流与批统一的特性,并且能够自动帮用户做很多SQL优化(下推、剪枝等),这是 Beam 所做不到的地方。当然,未来如果 Table & SQL API 发展成熟的话,剥离出来作为业界标准的流与批统一的API也不是不可能(叫BeamTable,BeamSQL ?),哈哈。这也是我非常看好 Table & SQL API,认为其大有潜力的一个原因。当然就目前来说,需要走的路还很长,Table API 现在还只是个玩具。

 

来源:   http://wuchong.me/blog/2017/03/30/flink-internals-table-and-sql-api/