记一次 flink job 不停重启的修复过程
in Tutorial with 0 comment
记一次 flink job 不停重启的修复过程
in Tutorial with 0 comment

异常表现

同一个业务,有4个job分别去处理,但是有1个出现异常,不断重启,表现如下:

2022-03-07T20:38:53.png

排查过程

查看服务日志,内容如下:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
    at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.NumberFormatException}
    ... 12 more
Caused by: java.lang.NumberFormatException
    at java.math.BigDecimal.<init>(BigDecimal.java:497)
    at java.math.BigDecimal.<init>(BigDecimal.java:827)
    at scala.math.BigDecimal$.decimal(BigDecimal.scala:50)
    at scala.math.BigDecimal$.decimal(BigDecimal.scala:53)
    at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:341)
    at com.xxx.ProcessPedestrianVisitor.$anonfun$trajectoryReduce$4(PedestrianAnalyzer.scala:198)
    at com.xxx.ProcessPedestrianVisitor.$anonfun$trajectoryReduce$4$adapted(PedestrianAnalyzer.scala:198)
    at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:247)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:246)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:258)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:258)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at com.xxx.ProcessPedestrianVisitor.$anonfun$trajectoryReduce$2(PedestrianAnalyzer.scala:198)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:70)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.xxx.ProcessPedestrianVisitor.$anonfun$trajectoryReduce$1(PedestrianAnalyzer.scala:197)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:70)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at com.xxx.ProcessPedestrianVisitor.trajectoryReduce(PedestrianAnalyzer.scala:196)
    at com.xxx.ProcessPedestrianVisitor.process(PedestrianAnalyzer.scala:120)
    at com.xxx.ProcessPedestrianVisitor.process(PedestrianAnalyzer.scala:98)
    at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
    at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onProcessingTime(EvictingWindowOperator.java:318)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
    ... 11 more

报错信息是:

Caused by: java.lang.NumberFormatException

报错位置是:

com.xxx.ProcessPedestrianVisitor.$anonfun$trajectoryReduce$2(PedestrianAnalyzer.scala:198)

也就是在trajectoryReduce这个函数里,存在数字格式化的异常情况

解决思路

两步走,先紧急修复,再代码修复。

先紧急恢复,过程如下:
1、先 cancel job
2、重新调起新 job
3、观察是否还有这种情况

代码修复,过程如下:
1、修复代码,阅读异常函数
2、判断异常点位置,避开问题,最终使用if判断,排除异常数据进入函数处理
3、测试是否避开了异常点
4、发版修复上线


👊 收工~

Responses