主要内容:实现Flink写数据到Redis,即Redis Sink。

准备工作

Flink Redis Connector提供了一个Sink可将数据写入Redis。若要使用该连接器需要将以下内容引入工程:

<!-- redis connector -->
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

代码实现

核心代码在于:

streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))

这里你需要两个参数:

  • redisConf:redis配置信息
// 配置redis conf
val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
    .setHost("host")
    .setPort(6379)
    .setTimeout(30000)
    .build()
  • MyRedisMapper:自定义的RedisMapper
//自定义MyRedisMapper
class MyRedisMapper extends RedisMapper[Person] {
    override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
    }
    // redis filed
    override def getKeyFromData(t: Person): String = {
        t.name
    }
    // redis value
    override def getValueFromData(t: Person): String = {
        t.age + ":" + t.gender + ":" + t.height
    }

完整代码如下:

package org.ourhome.streamapi

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
 * @Author Do
 * @Date 2020/4/18 21:19
 */
object RedisSinkTest {
  private val REDIS_KEY = "person_message"

  def main(args: Array[String]): Unit = {
    val params: ParameterTool = ParameterTool.fromArgs(args)
    val runType: String = params.get("runtype")
    println("runType: " + runType)

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.enableCheckpointing(5000)
    env.setStateBackend(new FsStateBackend("file:///D:/Work/Code/flinkdev/src/main/resources/checkpoint"))

    val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
    // 处理inputStream,包装成Person类
    val streaming: DataStream[Person] = inputStream.map(line => {
      println(line)
      val strings: Array[String] = line.split(",")
      Person(strings(0).trim, strings(1).trim.toInt, strings(2).trim, strings(3).trim.toFloat)
    })
    // 配置redis conf
    val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("host")
      .setPort(6379)
      .setTimeout(30000)
      .build()
    streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))

    env.execute("Redis Sink")
  }

  case class Person (name: String, age: Int, gender: String, height: Float)
  //自定义MyRedisMapper
  class MyRedisMapper extends RedisMapper[Person] {
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
    }
    // redis filed
    override def getKeyFromData(t: Person): String = {
      t.name
    }
    // redis value
    override def getValueFromData(t: Person): String = {
      t.age + ":" + t.gender + ":" + t.height
    }

  }

}

customdata.txt文件

小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3

Redis数据:

row key value
1 小明 20:man:180.2
2 小红 22:woman:178.4
3 小黑 18:man:192.9
4 小兰 19:woman:188.0
5 小爱 30:woman:177.3