世界快资讯丨Spark-Kafka kafka.common.OffsetOutOfRangeException

2022-09-29 15:20:14来源:互联网  

问题描述

国庆前启动spark streaming任务去消费了kafka,后来由于其他原因停止了,放假回来后,重启spark任务后,报kafka.common.OffsetOutOfRangeException,期初我以为是ZK重启造成的就换了个group.id正常了,今天看到一篇文章才知道真实的原因

kafka会定时清理日志

当我们的任务开始的时候,如果之前消费过某个topic,那么这个topic会在zk上设置offset,我们一般会去获取这个offset来继续从上次结束的地方继续消费,但是kafka定时清理日志的功能,比如定时一天一清理,那么如果你的offset是前天消费的offset,那么这个时候你再去消费,自然而然的你的offset肯定已经不在有效范围内,所以就报OffsetOutOfRangeException


(资料图)

解决方法

也很简单,就是去判断一下zk中的offset是否小于topic最小的offset,如果小于的话,就把最小的offset设置到zk中

代码

def setOrUpdateOffsets(implicit topics: Set[String], kc: KafkaCluster): Unit = 
{    topics.foreach(topic => 
{      println("current topic:" + topic)      val groupId = Config.kafkaConf.getOrElse("group.id", "")
 var hasConsumed = true      val kafkaPartitionsE = kc.getPartitions(Set(topic)) 
if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:"
 val kafkaPartitions = kafkaPartitionsE.right.get 
 val consumerOffsetsE = kc.getConsumerOffsets(groupId, kafkaPartitions)
if (consumerOffsetsE.isLeft) hasConsumed = false      if (hasConsumed) { 
//如果有消费过,有两种可能,如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 
//针对这种情况,只要判断一下zk上的consumerOffsets和leaderEarliestOffsets的大小,
如果consumerOffsets比leaderEarliestOffsets还小的话,说明是过时的offsets,这时把leaderEarliestOffsets更新为consumerOffsets
        val leaderEarliestOffsets = kc.getEarliestLeaderOffsets(kafkaPartitions).right.get        println(leaderEarliestOffsets) 
 val consumerOffsets = consumerOffsetsE.right.get 
 val flag = consumerOffsets.forall {          case (tp, n) => n < leaderEarliestOffsets(tp).offset
if (flag) {          println("consumer group:" + groupId + " offsets已经过时,更新为leaderEarliestOffsets")          v
al offsets = leaderEarliestOffsets.map {            case (tp, offset) => (tp, offset.offset)          } 
 kc.setConsumerOffsets(groupId, offsets)        }        else {          println("consumer group:" + groupId + " offsets正常,无需更新")        }      }      else
//如果没有被消费过,则从最新的offset开始消费。 
 val leaderLatestOffsets = kc.getLatestLeaderOffsets(kafkaPartitions).right.get        println(leaderLatestOffsets)
 println("consumer group:" + groupId + " 还未消费过,更新为leaderLatestOffsets"
 val offsets = leaderLatestOffsets.map {          case (tp, offset) => (tp, offset.offset)        }        kc.setConsumerOffsets(groupId, offsets)      }    })  }
   

错误:

18/02/19 12:31:39 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 39)org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {prensa4-0=744}        at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)        at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)        at scala.collection.Iterator$class.foreach(Iterator.scala:893)        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)        at scala.collection.AbstractIterator.to(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)        at org.apache.spark.scheduler.Task.run(Task.scala:108)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)18/02/19 12:31:39 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; aborting joborg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 39, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {prensa4-0=744}        at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)        at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)        at scala.collection.Iterator$class.foreach(Iterator.scala:893)        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)        at scala.collection.AbstractIterator.to(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)        at org.apache.spark.scheduler.Task.run(Task.scala:108)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)Driver stacktrace:        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)        at scala.Option.foreach(Option.scala:257)        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1461)        at itcl.adquisicionDatos$$anonfun$main$3.apply(adquisicionDatos-gonvarri.scala:212)        at itcl.adquisicionDatos$$anonfun$main$3.apply(adquisicionDatos-gonvarri.scala:200)        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)        at scala.util.Try$.apply(Try.scala:192)        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {prensa4-0=744}        at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)        at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)        at scala.collection.Iterator$class.foreach(Iterator.scala:893)        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)        at scala.collection.AbstractIterator.to(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)        at org.apache.spark.scheduler.Task.run(Task.scala:108)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)        ... 3 more:

解决方案

We had the same issue. For us the problem was:

master creates tasks like "read from that topic-partition form offset X to offset Y" and passes that tasks to executors.

executor receives tasks and start consuming data form topic-partition. At that time, due to topic configuration (time or size retention) offset X become unavailable. KafkaConsumer that is created for consuming messages doesn't have offset reset policy.

result is exception you see

solution: you need to tune

your kafka topic config speed of your spark job (i.e. decrease batch time, increase number of executors).

两种解决方案供参考

https://issues.apache.org/jira/browse/SPARK-19680

下面这个链接解释比较详细:

https://blog.csdn.net/xueba207/article/details/51174818

代码解决方案:

https://blog.csdn.net/zyj_2012/article/details/78665709

相关阅读

精彩推荐

相关词

推荐阅读