主要内容:介绍Flink中的RichFunction函数。

上一篇文章:Flink——实战之MySQL Sink在介绍MySQL Sink时,强调了这里要选择继承RichSinkFunction接口,而非SinkFunction接口:

streaming.addSink(new MyJdbcSink).setParallelism(1)
class MyJdbcSink extends RichSinkFunction[Person] {...}

从而实现将数据从Flink写入MySQL。其实,streaming.addSink()函数明明需要传入一个SinkFunction,那为何传入RichSinkFunction也可实现自定义Sink呢?

在IDEA中展示RichSinkFunction的继承关系层次结构图:

蓝色实线:extends继承一个抽象类

绿色实线:extends继承一个类

绿色虚线:implements实现一个接口

Function接口是所有用户自定义函数的base interface,RichFunction和SinkFunction都是继承Function的接口。可以看到,SinkFunction和RichFunction接口中有各有不同的方法,而后者的方法更丰富一些,功能也就越多,所以称为“富函数”。

RichFunction接口中各函数介绍

An base interface for all rich user-defined functions. This class defines methods for the life cycle of the functions, as well as methods to access the context in which the functions are executed.

源码给出的解释如上。大意:RichFunction是所有用户自定义富函数的base interface。RichFunction与常规函数的不同在于:它拥有一些具有生命周期的方法,并可以获取函数运行时的上下文。下面解释一下各个方法的作用:

class MyRichFunction extends RichFunction[String] {
    override def open(parameters: Configuration): Unit = ???

    override def close(): Unit = ???

    override def getRuntimeContext: RuntimeContext = ???

    override def getIterationRuntimeContext: IterationRuntimeContext = ???

    override def setRuntimeContext(t: RuntimeContext): Unit = ???
  }

RichFunction:

  • open():在函数调用前,open()函数先被调用,用于初始化操作;
  • close():生命周期中最后一个被调用的方法,做一些清理工作;
  • setRuntimeContext(RuntimeContext):设置运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据;
  • getRuntimeContext():获取运行时上下文;
  • getIterationRuntimeContext():获取迭代运行时上下文。

SinkFunction:

  • invoke():将传入的数据写入Sink。需要强调的是,每传入一个数据就要调用一次该函数。若Sink是MySQl,每次都要创建一个连接。

由此可见,RichFunction可以实现更复杂的功能,并能提高性能,节省资源。上面说到的RichSinkFunction类之所以有上述方法,是因为从RichFunction接口继承而来。其实,在Flink当中,所有函数类都有继承或者间接继承RichFunction,都有其Rich版本。

RichFunction实现map算子

下面举一个简单例子:Flink从文件读取数据,经过map算子处理后,取出每行中的name字段,程序结束时输出所统计到的人数。

文本数据:

小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3

main函数:

def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
    env.setParallelism(1)

    inputStream.map(new MyRichMapFunction).print("name")

    env.execute("RichMapFunction test")
  }

自定义RichMapFunction函数:

class MyRichMapFunction extends RichMapFunction[String, String] {
    var count: Int = 0
    var startTime: Long = _
    val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS")

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      startTime = System.currentTimeMillis()
      println("open函数调用时间:" + timeFormat.format(startTime))
      println("--------------------------------------------")
    }

    override def map(value: String): String = {
      println("map函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
      count += 1
      value.split(",")(0)
    }

    override def close(): Unit = {
      println("--------------------------------------------")
      println("close函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
      super.close()
      println("共统计个数:" + count)
    }
  }

输出:

open函数调用时间:2020-04-19 19:00:19.0331
--------------------------------------------
map函数调用时间:2020-04-19 19:00:19.0616
name> 小爱
map函数调用时间:2020-04-19 19:00:19.0616
name> 小黑
map函数调用时间:2020-04-19 19:00:19.0616
name> 小红
map函数调用时间:2020-04-19 19:00:19.0616
name> 小明
map函数调用时间:2020-04-19 19:00:19.0616
name> 小兰
--------------------------------------------
close函数调用时间:2020-04-19 19:00:19.0616
共统计个数:5

验证了RichFunction中open()、map()、close()函数执行的顺序以及在函数生命周期内可获取上下文变量count。需要注意:程序中添加了env.setParallelism(1),即指定了只使用一个线程。若不设定,默认是取机器的CPU核数。且每调用一次MyRichMapFunction,就会调用一次open()map()close()函数。