主要内容:

  • 介绍实时计算PVUV项目

需求

根据埋点信息,包括移动端埋点和Web端埋点,实时统计T系统的PVUV变化情况。

  1. 实时显示:当天总PVUV变化
  2. 实时显示:当前每一个小时PVUV的变化
  3. 数据不能保证有序,需要做延时处理
  4. 数据源是kafka,PVUV数据来自两个topic,日志格式不同
  5. 数据量:移动端+Web 端 28-32G/天

分析

先分析可确定项:

  • 时间语义选择EventTime,利用数据自带的时间戳信息:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

提取时间戳字段并生成watermark(尽量放在靠近数据源的位置):

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogMsg](Time.seconds(2)) {
        override def extractTimestamp(element: LogMsg): Long = {
          element.time
        }
      })
  • 数据源DataSource使用KafkaSource,Consumer使用FlinkKafkaConsumer,topic参数使用List
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
		this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
	}
  • 根据不同的日志格式编写正则,从多个字段中匹配出子系统编码、用户ID、时间戳等有用字段封装成LogMag对象:
case class LogMsg(subSysCode:String = "", userId:String = "", time:Long = 0L, enabledflag: Int = 0)

kafkaData.filter(new FilterFunction[String] {
      override def filter(value: String): Boolean = {
        val subSysCode: Try[String] = getSubSysCode(value)
        if (subSysCode.isSuccess) {
          if (ICENTER_SUB_SYS_CODE == subSysCode.get) true else false
        } else false
      }
    }).map(line => {
      if (mapToLogMsg(line).isSuccess) {
        mapToLogMsg(line).get
      } else {
        LogMsg()
      }
    }).filter(_.enabledflag == 1)

在将数据封装成LogMsg时,使用了Try来处理异常,返回结果需要调用isSuccess方法判断是否存在异常,获取具体值时需要调用get方法。

匹配不同日志格式中的子系统编码信息:

def getSubSysCode(value: String): Try[String] = {
     Try {
      val logSplit: Array[String] = value.split("\\|")
      if (logSplit.length > 2) { // web端埋点
        logSplit(5)
      } else { // 移动端埋点
        (subSysCodeMobilePattern findFirstIn value).get.split("\"")(3)
      }
    }
  }

匹配不同日志格式中的时间戳和用户ID信息:

def mapToLogMsg(value: String): Try[LogMsg] = {
    val subSysCode: String = T_SUB_SYS_CODE
    Try {
      val logSplit: Array[String] = value.split("\\|")
      if (logSplit.length > 2) { // web端埋点
        val time: Long = dateFormat.parse(logSplit(0)).getTime
        val userId: String = (userIdWebPattern findFirstIn value).get.split("\\^")(1)
        LogMsg(subSysCode, userId, time, 1)
      } else { // 移动端埋点
        val time: Long = dateFormat.parse(logSplit(0)).getTime
        val userId: String = (userIdMobilePattern findFirstIn value).get.split("\"")(3)
        LogMsg(subSysCode, userId, time, 1)
      }
    }
  }

以上工作均为准备工作,重点在于选择哪种方式统计PVUV,以及数据存储位置。

方案

根据统计方式和数据存储位置的不同,可供选择的方案:

方案1

  • 统计方式:使用windowAll,窗口关闭时统计整个窗口内数据,PV使用计数器统计,UV使用Set去重。windowAll:Non-Keyed Window,不分组,将数据流中的所有元素分配到相应的窗口中,不可设置并行度,始终为1。
  • 存储位置:内存

该方案以空间换时间,处理速度相对较快,但处理过程中会将数据暂存在内存中,若是数据量特别大或者内存大小有限的情况下会造成OOM问题。若数据量不大时,也是一个不错的选择。

该方案若不设置触发器,则等待的过程中没有数据输出,即在计算一个小时内的PVUV时,只有到达整点时才有数据。所以无法满足需求1和需求2。但可以通过设置Trigger来触发计算,如可以设置固定时间间隔触发或者数据到达一定量时触发计算,如:ContinuousEventTimeTriggerCountTrigger

另外,该方案在计算PVUV时,若不借助外界存储,是不能统计全天的PVUV累积量的,只能统计每个小时的变化情况。

.timeWindowAll(Time.hours(24))
      .apply(new AllWindowFunction[LogMsg, PvUvCount, TimeWindow] {
        override def apply(window: TimeWindow, input: Iterable[LogMsg], out: Collector[PvUvCount]): Unit = {
          // 定义一个set  保存userId
          val userSet: Set[String] = Set[String]()
          // 定义一个计数器
          var userCount: Long = 0
          // 把当前所有userId收集到set,最后输出set大小
          for (eachLog <- input) {
            userSet += eachLog.userId
            userCount += 1
          }
          val windowStart: String = DATE_FORMAT.format(window.getStart)
          val windowEnd: String = DATE_FORMAT.format(window.getEnd)

          out.collect(PvUvCount(windowStart, windowEnd, userCount, userSet.size))
        }
      }).print("output data ")

方案2

  • 统计方式:使用timeWindow,可以设置多并行度,可以设置数据来一条处理一条,或处理多条。使用process函数时借助ProcessWindowFunction抽象类,该抽象类继承了RichFunction的一些方法,可选择使用open或者close来创建和关闭Redis连接。也可以通过addSink写入Redis。
  • 存储位置:Reids或者Hbase,这里选择redis。计算PV使用Hashmap数据结构的hincrBy方法执行+1操作,计算UV使用Set数据结构去重。

该方案可借助Redis计算PVUV。当天的总PV可以按照每小时的PV统计量进行累加,但是UV不能累加,可将每个小时的userId都放在同一个Set里。当窗口触发时,先执行PV+1操作,再将userId加入到Set,最后执行读取Set长度的操作,并记录到另一个hashmap里。

该方案可以满足需求上述全部需求。但是该方案在数据量特别大的特殊情况下,比如每秒几亿条数据,userId会撑爆Redis服务器的内存。但是在本需求提供的数据量下是没有问题的。

.keyBy(_.subSysCode)
      .timeWindow(Time.hours(24))
      .process(new ProcessWindowFunction[LogMsg, PvUvCount, String, TimeWindow] {
        var jedis: Jedis = _

        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          this.jedis = new Jedis("", 6379)
        }

        override def close(): Unit = {
          super.close()
          if (this.jedis != null) {
            this.jedis.close()
          }
        }

        override def process(key: String, context: Context, elements: Iterable[LogMsg], out: Collector[PvUvCount]): Unit = {
          val userId: String = elements.last.userId
          jedis.hincrBy("key", context.window.getEnd.toString, 1)
          for (element <- elements) {
            // pv
            jedis.hincrBy("pvkey", context.window.getEnd.toString, 1)
            // uv
            jedis.sadd("uvkey", userId)
          }
          val uvCount: lang.Long = jedis.scard("uvkey")
          jedis.hset("uvkey-hours", context.window.getEnd.toString, uvCount.toString)
        }
      })

方案3

在方案2的基础上,使用布隆过滤器压缩处理userId,存储在位图,用一个位来表示userId。