Flink——DataStream API
主要内容:
- DataSource
- DataStream Transformations
- DataSink
关于Flink程序的开发流程和具体案例请参考:Flink——从零搭建Flink应用。
DataSource
Datasource用于Flink程序读取数据,可通过:StreamExecutionEnvironment.
进行配置。
内置数据源
- 文件数据源:
readTextFile(path)
:直接读取文本文件;readFile(fileInputFormat, path)
:读取指定类型的文件;readFile(fileInputFormat, path, watchType, interval, pathFilter)
:可指定读取文件的类型、检测文件变换的时间间隔、文件路径过滤条件等。watchType
分为两种模式:PROCESS_CONTINUOUSLY
:一旦检测到文件变化,会将改文件全部内容加载到Flink。该模式无法实现Excatly Once
;PROCESS_ONCE
:一旦检测到文件变化,只会将变化的数据加载到Flink。该模式无法实现Excatly Once
。
- socket数据源:
socketTextStream(hostname, port)
:从Socket端口传入数据;
- 集合数据源:
fromCollection(Seq)
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator)
generateSequence(from, to)
外部数据源
对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此,Flink通过实现SourceFunction
定义了丰富的第三方数据连接器(支持自定义数据源):
DataStream Transformations
Operator |
Transformation |
Example |
---|---|---|
map | DataStream → DataStream | dataStream.map { x => x * 2 } |
flatMap | DataStream → DataStream | dataStream.flatMap { str => str.split(" ") } |
filter | DataStream → DataStream | dataStream.filter { _ != 0 } |
keyBy | DataStream → KeyedStream | dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple |
reduce | KeyedStream → DataStream | keyedStream.reduce { _ + _ } |
fold | KeyedStream → DataStream | val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i }) |
aggregations | KeyedStream → DataStream | keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") |
window | KeyedStream → WindowedStream | dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data |
windowAll | DataStream → AllWindowedStream | dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data |
Window Apply | WindowedStream → DataStream AllWindowedStream → DataStream | windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } |
Window Reduce | WindowedStream → DataStream | windowedStream.reduce { _ + _ } |
Window Fold | WindowedStream → DataStream | val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) |
Aggregations on windows | WindowedStream → DataStream | windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key") |
union | DataStream* → DataStream | dataStream.union(otherStream1, otherStream2, ...) |
Window Join | DataStream,DataStream → DataStream | dataStream.join(otherStream) .where( .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } |
Window CoGroup | DataStream,DataStream → DataStream | dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} |
connect | DataStream,DataStream → ConnectedStreams | someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) |
CoMap, CoFlatMap | ConnectedStreams → DataStream | connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) |
split | DataStream → SplitStream | val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } ) |
select | SplitStream → DataStream | val even = split select "even" val odd = split select "odd" val all = split.select("even","odd") |
iterate | DataStream → IterativeStream → DataStream | initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } |
Extract Timestamps | DataStream → DataStream | stream.assignTimestamps { timestampExtractor } |
DataSink
经过各种数据转换操作之后,形成最终结果数据集。通常情况下,需要将结果输出在外部存储介质或者传输到下游的消息中间件内,在Flink中将DataStream数据输出到外部系统的过程被定义为DataSink操作。可通过:StreamExecutionEnvironment.
进行配置。
内置数据源
writeAsText()
/TextOutputFormat
writeAsCsv(...)
/CsvOutputFormat
print()
/printToErr()
writeUsingOutputFormat()
/FileOutputFormat
writeToSocket