• 指定分区从头消费
  • 指定分区从最新offset消费
  • 指定分区、offset消费

指定分区从头消费

package test.kafka.kafkaconsumer

import java.util.Properties
import java.{lang, util}

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import test.kafka.kafkaProperities

import scala.collection.JavaConversions._

object ConsumerSpecificOffsets {
  private val KAFKA_PROPERITIES = new kafkaProperities()
  def main(args: Array[String]): Unit = {
    val kakConPros: Properties = KAFKA_PROPERITIES.getKakConPros
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](kakConPros)
    consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))

    var assignment: scala.collection.Set[TopicPartition] = Set()
    // 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配
    // 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区
    while (assignment.isEmpty) {
      consumer.poll(1000)
      // assignment()方法是用来获取消费者所分配到的分区消息的
      assignment = consumer.assignment()
    }
    println("assignment: " + assignment)
    //-----------------------------------------------------------------------------------------------
    // 指定分区从头消费
    val beginningOffsets: util.Map[TopicPartition, lang.Long] = consumer.beginningOffsets(assignment)
    assignment.foreach(
      topicPartition => {
        val offsets: lang.Long = beginningOffsets.get(topicPartition)
        println("分区" + topicPartition + "从" + offsets + "开始消费!")
        consumer.seek(topicPartition, offsets)
      }
    )
    //-----------------------------------------------------------------------------------------------
    while (true) {
      val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
      val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
      while (records.hasNext) {
        val record: ConsumerRecord[String, String] = records.next()
        println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
      }
    }
  }
}

输出:

assignment: Set(kafka_producer_test-1, kafka_producer_test-0, kafka_producer_test-2)
分区kafka_producer_test-1从108开始消费!
分区kafka_producer_test-0从45开始消费!
分区kafka_producer_test-2从47开始消费!
topic: kafka_producer_test partition: 1 offset: 108
topic: kafka_producer_test partition: 1 offset: 109
topic: kafka_producer_test partition: 1 offset: 110
topic: kafka_producer_test partition: 1 offset: 111
topic: kafka_producer_test partition: 1 offset: 112
topic: kafka_producer_test partition: 1 offset: 113
topic: kafka_producer_test partition: 1 offset: 114
topic: kafka_producer_test partition: 1 offset: 115
topic: kafka_producer_test partition: 1 offset: 116
topic: kafka_producer_test partition: 2 offset: 47
topic: kafka_producer_test partition: 2 offset: 48
topic: kafka_producer_test partition: 2 offset: 49
topic: kafka_producer_test partition: 2 offset: 50
topic: kafka_producer_test partition: 2 offset: 51
topic: kafka_producer_test partition: 2 offset: 52
topic: kafka_producer_test partition: 2 offset: 53
topic: kafka_producer_test partition: 2 offset: 54
topic: kafka_producer_test partition: 2 offset: 55
topic: kafka_producer_test partition: 2 offset: 56
topic: kafka_producer_test partition: 2 offset: 57
topic: kafka_producer_test partition: 0 offset: 45
topic: kafka_producer_test partition: 0 offset: 46
topic: kafka_producer_test partition: 0 offset: 47
topic: kafka_producer_test partition: 0 offset: 48
topic: kafka_producer_test partition: 0 offset: 49
topic: kafka_producer_test partition: 0 offset: 50
topic: kafka_producer_test partition: 0 offset: 51
topic: kafka_producer_test partition: 0 offset: 52
topic: kafka_producer_test partition: 0 offset: 53
topic: kafka_producer_test partition: 0 offset: 54

其中,

val beginningOffsets: util.Map[TopicPartition, lang.Long] = consumer.beginningOffsets(assignment)
    assignment.foreach(
      topicPartition => {
        val offsets: lang.Long = beginningOffsets.get(topicPartition)
        println("分区" + topicPartition + "从" + offsets + "开始消费!")
        consumer.seek(topicPartition, offsets)
      }
    )

可简化为:

consumer.seekToBeginning(assignment)

指定分区从最新offset消费

用以下代码替换掉“//------”:

val endOffsets: util.Map[TopicPartition, lang.Long] = consumer.endOffsets(assignment)
    assignment.foreach(
      topicPartition => {
        val offsets: lang.Long = endOffsets.get(topicPartition)
        println("分区" + topicPartition + "从" + offsets + "开始消费!")
        consumer.seek(topicPartition, offsets)
      }
    )

并可简化为:

consumer.seekToEnd(assignment)

指定分区、offset消费

指定从每个分区offset=20的位置开始消费:

assignment.foreach(
      topicPartition => {
        val offsets: lang.Long = 20
        println("分区" + topicPartition + "从" + offsets + "开始消费!")
        consumer.seek(topicPartition, offsets)
      }
    )