批处理经常要解决的问题是将两个数据源做关联Join操作。比如,很多手机APP都有一个用户数据源User,同时APP会记录用户的行为,我们称之为Behavior,两个表按照userId来进行Join。在流处理场景下,Flink也支持了Join,只不过Flink是在一个时间窗口上来进行两个表的Join。
Join示例图
目前,Flink支持了两种Join:Window Join(窗口连接)和Interval Join(时间间隔连接。
Window Join
从名字中能猜到,Window Join主要在Flink的窗口上进行操作,它将两个流中落在相同窗口的元素按照某个Key进行Join。一个Window Join的大致骨架结构为:
input1.join(input2) .where() <- input1使用哪个字段作为Key .equalTo() <- input2使用哪个字段作为Key .window() <- 指定WindowAssigner [.trigger()] <- 指定Trigger(可选) [.evictor()] <- 指定Evictor(可选) .apply() <- 指定JoinFunction
下图展示了Join的大致过程。两个输入数据流先分别按Key进行分组,然后将元素划分到窗口中。窗口的划分需要使用WindowAssigner来定义,这里可以使用Flink提供的滚动窗口、滑动窗口或会话窗口等默认的WindowAssigner。随后两个数据流中的元素会被分配到各个窗口上,也就是说一个窗口会包含来自两个数据流的元素。相同窗口内的数据会以INNER JOIN的语义来相互关联,形成一个数据对。当窗口的时间结束,Flink会调用JoinFunction来对窗口内的数据对进行处理。当然,我们也可以使用Trigger或Evictor做一些自定义优化,他们的使用方法和普通窗口的使用方法一样。
Join的大致流程
接下来我们重点分析一下两个数据流是如何INNER JOIN的:
窗口内数据INNER JOIN示意图
一般滴,INNER JOIN只对两个数据源都出现的元素做Join,形成一个数据对,即数据源input1中的某个元素与数据源input2中的所有元素逐个配对。当数据源某个窗口内没数据时,比如图中的第三个窗口,Join的结果也是空的。
class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] { override def join(input1: (String, Int), input2: (String, Int)): String = { "input 1 :" + input1._2 + ", input 2 :" + input2._2 }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyJoinFunction)
上面的代码自定义了JoinFunction,并将Join结果打印出来。无论代码中演示的滚动窗口,还是滑动窗口或会话窗口,其原理都是一样的。除了JoinFunction,Flink还提供了FlatJoinFunction,其功能是输出零到多个结果。
如果INNER JOIN不能满足我们的需求,CoGroupFunction提供了更多可自定义的功能。需要注意的是,在调用时,要写成input1.coGroup(input2).where(
class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] { // 这里的类型是Java的Iterable,需要引用 collection.JavaConverters._ 并转成Scala override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = { input1.asScala.foreach(element => out.collect("input1 :" + element.toString())) input2.asScala.foreach(element => out.collect("input2 :" + element.toString())) }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyCoGroupFunction)
Interval Join
与Window Join不同,Interval Join不依赖Flink的WindowAssigner,而是根据一个时间间隔(Interval)界定时间。Interval需要一个时间下界(lower bound)和上界(upper bound),如果我们将input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么一个Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。用数学公式表达为,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN语义,两两组合在一起。上下界可以是正数也可以是负数。
注意,目前Flink(1.9)的Interval Join只支持Event Time语义。
Interval Join示意图
下面的代码展示了如何对两个数据流进行Interval Join:
class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] { override def processElement(input1: (String, Long, Int), input2: (String, Long, Int), context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context, out: Collector[String]): Unit = { out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString) }}// 数据流有三个字段:(key, 时间戳, 数值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .process(new MyProcessFunction)
默认的时间间隔是包含上下界的,我们可以使用.lowerBoundExclusive() 和.upperBoundExclusive来确定是否需要包含上下界。
val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .upperBoundExclusive() .lowerBoundExclusive() .process(new MyProcessFunction)
Interval Join内部是用缓存来存储所有数据的,因此需要注意缓存数据不能太大,以免对内存造成绝大压力。
推荐阅读:甘肃消费网
-
上海这家餐厅茶点有特色,人均消费不足100元
上海这家餐厅茶点有特色,人均消费不足100元,好吃到撑解封了,上班了,一切都在回归的路上。中国加油!武汉加油!阳春三月,万物复苏,宅了这么久,是不是也想约上三五...
2020-03-18 -
我国重名最多的一座山,足足有19个相同名字,
如今我国在对一些新生儿起名字的时候,我国如今有很多人在给他们起名字的时候,有不少都是重复的,这已经不是什么新鲜事情了,而且在我国众多的一些景点中看,也是有不少重...
2020-03-18 -
北方的九寨沟,拥有华夏第一秀水,它是河南最值
我是旅图君,笔耕不辍,只为讲述旅途中更精彩的故事!一直挺喜欢河南这个地方,因为这里是中华文明的发祥地之一,从夏至宋,河南是华夏五千年政治、经济、文化的核心。因为...
2020-03-18 -
北方有这样的山水,开始我还不信,王维在这里“
河南景区,少林寺名气最大,不过被黑的不轻。单说风景漂不漂亮,在河南当属焦作云台山,《中国国家地理》曾经说过:太行山,把最美的一段给了河南。为什么这么说?因为水!...
2020-03-18 -
天下第一名山,道家追求仙境的理想之地,传说有
天下第一名山,道家追求仙境的理想之地,传说有个人得道飞升巍巍武当山,绵延八百里,它是世界文化遗产,也是集建筑、宗教、文化于一体的“天下第一名山”,吸引着世界各地...
2020-03-18 -
河南最受欢迎的人造景区,门票120元,游客络
最近几年期间,我国的经济得到了快速的发展,其不仅体现在人们的工资水平提升上,还体现在我们日常出行方式中。例如这几年的动车、高铁,已经是四通八达了,许多城市的镇里...
2020-03-18