主要内容:分析RedisSink源码,并结合具体案例实现Flink将数据写入Redis。

Flink Redis Connector提供了一个Sink可将数据写入Redis。若要使用该连接器需要将以下内容引入工程:

<!-- redis connector -->
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

1 源码分析

首先看一下RedisSink类的继承关系:

RedisSink间接继承了RichRunction接口,使其也拥有了一些具有生命周期的方法,并可以获取函数运行时的上下文。

RedisSink类需要两个参数:

  • FlinkJedisConfigBase:Redis的配置文件
  • RedisMapper:用于生成Redis命令和键值对
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
        Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
        Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
        Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");

        this.flinkJedisConfigBase = flinkJedisConfigBase;

        this.redisSinkMapper = redisSinkMapper;
        RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
        this.redisCommand = redisCommandDescription.getCommand();
        this.additionalKey = redisCommandDescription.getAdditionalKey();
    }

FlinkJedisConfigBase分析

FlinkJedisClusterConfigFlinkJedisSentinelConfigFlinkJedisPoolConfig三个类都继承了抽象类FlinkJedisConfigBase,分别对应了Redis的集群模式、哨兵模式和单节点模式。且每个类都有一个内部类Builder,用于初始化一些变量,如host、port、timeout等参数。使用时根据情况选择相应的类即可。

RedisMapper分析

直接给出应用案例,用于理解:如何用于生成Redis命令和键值对。

class RedisTestMapper extends RedisMapper[Array[String]] {
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.LPUSH)
    }

    override def getKeyFromData(data: Array[String]): String = {
      data(0)
    }

    override def getValueFromData(data: Array[String]): String = {
      data(1)
    }
  }

2 RedisSink的方法分析

RedisSink类重写了3个方法:

  • open():用于初始化Redis连接,包括集群模式、哨兵模式和单节点模式。该方法创建了一个redisCommandsContainer接口,里面包含可用的Redis命令。
@Override
    public void open(Configuration parameters) throws Exception {
        try {
            this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
            this.redisCommandsContainer.open();
        } catch (Exception e) {
            LOG.error("Redis has not been properly initialized: ", e);
            throw e;
        }
    }
  • close():关闭包含Redis命令的容器,也可以理解为关闭Redis连接。
@Override
    public void close() throws IOException {
        if (redisCommandsContainer != null) {
            redisCommandsContainer.close();
        }
    }
  • invoke():根据传入数据input的数据类型,选择不同的Redis命令,将数据发送到Redis。Redis命令目前仅支持8个基础命令,其他的需要自己实现:
    • push
    • sadd
    • set
    • publish
    • pfadd
    • zadd
    • zrem
    • hset
@Override
    public void invoke(IN input) throws Exception {
        String key = redisSinkMapper.getKeyFromData(input);
        String value = redisSinkMapper.getValueFromData(input);

        switch (redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(key, value);
                break;
            case LPUSH:
                this.redisCommandsContainer.lpush(key, value);
                break;
            case SADD:
                this.redisCommandsContainer.sadd(key, value);
                break;
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            case PFADD:
                this.redisCommandsContainer.pfadd(key, value);
                break;
            case PUBLISH:
                this.redisCommandsContainer.publish(key, value);
                break;
            case ZADD:
                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                break;
            case ZREM:
                this.redisCommandsContainer.zrem(this.additionalKey, key);
                break;
            case HSET:
                this.redisCommandsContainer.hset(this.additionalKey, key, value);
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
        }
    }

3 RdeisSink执行过程

  1. 选择不同的方式(包括集群模式、单点模式、哨兵模式)创建FlinkJedisConfigBase对象;
  2. 创建RedisMapper对象。选定Redis命令并指定键值对;
  3. 使用以上两个参数,创建RedisSink对象:new RedisSink(FlinkJedisConfigBase, RedisMapper)
  4. 调用RedisSink对象的open方法,在该方法中调用RedisCommandsContainerBuilder,并根据FlinkJedisConfigBase对象类型,生成不同的RedisCommandsContainer:
  • FlinkJedisClusterConfig类型对应生成RedisClusterContainer类。
  • FlinkJedisSentinelConfigFlinkJedisPoolConfig类型对应生成RedisContainer类。

其中,RedisClusterContainer类和RedisContainer类是RedisCommandsContainer接口的两个实现类,对应Redis具体操作。

4 应用案例

使用Flink将文本数据Sink到Redis。Redis使用单节点模式。

完整代码

package org.ourhome.streamapi

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
 * @Author Do
 * @Date 2020/4/18 21:19
 */
object RedisSinkTest {
  private val REDIS_KEY = "person_message"

  def main(args: Array[String]): Unit = {
    val params: ParameterTool = ParameterTool.fromArgs(args)
    val runType: String = params.get("runtype")
    println("runType: " + runType)

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.enableCheckpointing(5000)
    env.setStateBackend(new FsStateBackend("file:///D:/Work/Code/flinkdev/src/main/resources/checkpoint"))

    val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
    // 处理inputStream,包装成Person类
    val streaming: DataStream[Person] = inputStream.map(line => {
      println(line)
      val strings: Array[String] = line.split(",")
      Person(strings(0).trim, strings(1).trim.toInt, strings(2).trim, strings(3).trim.toFloat)
    })
    // 配置redis conf
    val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("host")
      .setPort(6379)
      .setTimeout(30000)
      .build()
    streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))

    env.execute("Redis Sink")
  }

  case class Person (name: String, age: Int, gender: String, height: Float)
  //自定义MyRedisMapper
  class MyRedisMapper extends RedisMapper[Person] {
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
    }
    // redis filed
    override def getKeyFromData(t: Person): String = {
      t.name
    }
    // redis value
    override def getValueFromData(t: Person): String = {
      t.age + ":" + t.gender + ":" + t.height
    }

  }

}

customdata.txt文件

小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3

Redis数据:

row key value
1 小明 20:man:180.2
2 小红 22:woman:178.4
3 小黑 18:man:192.9
4 小兰 19:woman:188.0
5 小爱 30:woman:177.3