主要内容:

  • DataSource
  • DataStream Transformations
  • DataSink

关于Flink程序的开发流程和具体案例请参考:Flink——从零搭建Flink应用

DataSource

Datasource用于Flink程序读取数据,可通过:StreamExecutionEnvironment.进行配置。

内置数据源

  • 文件数据源:
    • readTextFile(path):直接读取文本文件;
    • readFile(fileInputFormat, path):读取指定类型的文件;
    • readFile(fileInputFormat, path, watchType, interval, pathFilter):可指定读取文件的类型、检测文件变换的时间间隔、文件路径过滤条件等。watchType分为两种模式:
      • PROCESS_CONTINUOUSLY:一旦检测到文件变化,会将改文件全部内容加载到Flink。该模式无法实现Excatly Once
      • PROCESS_ONCE:一旦检测到文件变化,只会将变化的数据加载到Flink。该模式无法实现Excatly Once
  • socket数据源:
    • socketTextStream(hostname, port):从Socket端口传入数据;
  • 集合数据源:
    • fromCollection(Seq)
    • fromCollection(Iterator)
    • fromElements(elements: _*)
    • fromParallelCollection(SplittableIterator)
    • generateSequence(from, to)

外部数据源

对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此,Flink通过实现SourceFunction定义了丰富的第三方数据连接器(支持自定义数据源):

DataStream Transformations

Operator Transformation Example
map DataStream → DataStream dataStream.map { x => x * 2 }
flatMap DataStream → DataStream dataStream.flatMap { str => str.split(" ") }
filter DataStream → DataStream dataStream.filter { _ != 0 }
keyBy DataStream → KeyedStream dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
reduce KeyedStream → DataStream keyedStream.reduce { _ + _ }
fold KeyedStream → DataStream val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
aggregations KeyedStream → DataStream keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
window KeyedStream → WindowedStream dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
windowAll DataStream → AllWindowedStream dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce WindowedStream → DataStream windowedStream.reduce { _ + _ }
Window Fold WindowedStream → DataStream val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows WindowedStream → DataStream windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
union DataStream* → DataStream dataStream.union(otherStream1, otherStream2, ...)
Window Join DataStream,DataStream → DataStream dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup DataStream,DataStream → DataStream dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
connect DataStream,DataStream → ConnectedStreams someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap ConnectedStreams → DataStream connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
split DataStream → SplitStream val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
select SplitStream → DataStream val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")
iterate DataStream → IterativeStream → DataStream initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }
Extract Timestamps DataStream → DataStream stream.assignTimestamps { timestampExtractor }

DataSink

经过各种数据转换操作之后,形成最终结果数据集。通常情况下,需要将结果输出在外部存储介质或者传输到下游的消息中间件内,在Flink中将DataStream数据输出到外部系统的过程被定义为DataSink操作。可通过:StreamExecutionEnvironment.进行配置。

内置数据源

  • writeAsText() / TextOutputFormat
  • writeAsCsv(...) / CsvOutputFormat
  • print() / printToErr()
  • writeUsingOutputFormat() / FileOutputFormat
  • writeToSocket

外部数据源