Kafka提交偏移量的方式:

  • 自动提交
  • 手动提交——同步
  • 手动提交——异步

自动提交

package test.kafka

import java.util.Properties

import org.apache.kafka.common.serialization.StringSerializer

class kafkaProperities {
  private val KFK_BROKERS = "host1:port1,host2:port2,post3:port3"

  def getKakConPros:Properties = {
    val kfkConProps: Properties = new Properties()
    kfkConProps.setProperty("bootstrap.servers", KFK_BROKERS)
    kfkConProps.setProperty("group.id", "kafka_consumer")
    kfkConProps.setProperty("enable.auto.commit", "true")
    kfkConProps.setProperty("auto.commit.interval.ms", "5000")
    kfkConProps.setProperty("key.deserializer", classOf[StringSerializer].getName)
    kfkConProps.setProperty("value.deserializer", classOf[StringSerializer].getName)

    kfkConProps
  }
}
package test.kafka.kafkaconsumer

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import test.kafka.kafkaProperities

object autoCommitOffsets {
  private val KAFKA_PROPERITIES = new kafkaProperities()

  def main(args: Array[String]): Unit = {
    val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
    consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
    try {
      while(true) {
        val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
        val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
        while (records.hasNext){
          val record: ConsumerRecord[String, String] = records.next()
          println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
        }
      }
    } finally {
      consumer.close()
    }
  }
}

手动提交——同步

修改getKakConPros的参数:

	kfkConProps.setProperty("enable.auto.commit", "false")
    //kfkConProps.setProperty("auto.commit.interval.ms", "5000")
package test.kafka.kafkaconsumer

import java.util
import java.util.Properties
import java.util.logging.Logger

import org.apache.kafka.clients.consumer.{CommitFailedException, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import test.kafka.kafkaProperities

object syncCommitOffsets {
  private val KAFKA_PROPERITIES = new kafkaProperities()
  def main(args: Array[String]): Unit = {
    val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
    consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
    while(true) {
      val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
      val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
      while (records.hasNext) {
        val record: ConsumerRecord[String, String] = records.next()
        println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
      }
    try {
      consumer.commitSync()
    } catch {
      case e: CommitFailedException => {
        println("commit failed!" + e)
      }
    }
    }
  }
}

手动提交——异步提交

package test.kafka.kafkaconsumer

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import test.kafka.kafkaProperities

object asyncCommitOffsets {
  private val KAFKA_PROPERITIES = new kafkaProperities()
  def main(args: Array[String]): Unit = {
    val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
    consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
    while(true) {
      val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
      val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
      while (records.hasNext){
        val record: ConsumerRecord[String, String] = records.next()
        println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
      }
    //consumer.commitAsync()
    //也支持回调
    consumer.commitAsync(new OffsetCommitCallback {
      override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
        if (exception != null) {
          println("Commit failed for offset " + offsets.values().iterator().next().offset())
        } else {
          println("Commit succeeded!")
        }
      }
    })
    }
  }
}