Flink-SQL join 优化 -- MiniBatch + local-global

这篇具有很好参考价值的文章主要介绍了Flink-SQL join 优化 -- MiniBatch + local-global。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

  • 问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container heartbeat timeout,内存溢出(GC overhead limit exceede) ,作业无法进行正常工作
023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping worker container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.internalStopWorker(ActiveResourceManager.java:461)
2023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping container container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.yarn.YarnResourceManagerDriver.releaseResource(YarnResourceManagerDriver.java:298)
2023-10-07 14:53:30,409 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] | Processing Event EventType: STOP_CONTAINER for Container container_e03_1678102291469_2749_01_000002 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:30,824 | WARN  | [flink-akka.actor.default-dispatcher-29] | Remote connection to [/10.155.0.9:42366] failed with java.io.IOException: Connection reset by peer | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-akka.actor.default-dispatcher-29] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-metrics-6] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:32,171 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers. | org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersAllocated$1(YarnResourceManagerDriver.java:620)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers with priority 1, 1 pending container requests. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:352)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Removing container request Capability[<memory:4096, vCores:3>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}]Resource Profile[null]. | org.apache.flink.yarn.YarnResourceManagerDriver.removeContainerRequest(YarnResourceManagerDriver.java:405)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:4096, vCores:3>. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:392)
2023-10-07 14:53:32,172 | INFO  | [cluster-io-thread-4] | TaskExecutor container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) will be started on node-group-1jPmk0002.mrs-qrmc.com with TaskExecutorProcessSpec {cpuCores=3.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb (429496736 bytes), numSlots=6}. | org.apache.flink.yarn.YarnResourceManagerDriver.createTaskExecutorLaunchContext(YarnResourceManagerDriver.java:472)
2023-10-07 14:53:32,178 | INFO  | [cluster-io-thread-4] | Creating container launch context for TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:500)
2023-10-07 14:53:32,179 | INFO  | [cluster-io-thread-4] | Starting TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:520)
2023-10-07 14:53:32,182 | INFO  | [flink-akka.actor.default-dispatcher-29] | Requested worker container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) with resource spec WorkerResourceSpec {cpuCores=3.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes), numSlots=6}. | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$1(ActiveResourceManager.java:419)
2023-10-07 14:53:32,182 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #2] | Processing Event EventType: START_CONTAINER for Container container_e03_1678102291469_2749_01_000003 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:32,554 | INFO  | [flink-akka.actor.default-dispatcher-29] | Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out. | org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1372)
2023-10-07 14:53:32,558 | INFO  | [flink-akka.actor.default-dispatcher-29] | ConstraintEnforcer[20] (1/6) (1a65cb345c504de7b9704270217856a7) switched from RUNNING to FAILED on container_e03_1678102291469_2749_01_000002 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2023-10-07 14:53:32,567 | INFO  | [flink-akka.actor.default-dispatcher-29] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
    at org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1432)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1124)
    at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:908)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1318)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1277)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:733)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373)

  • 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况
    Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据

问题分析

  • 查看日志内容发现,出现内存溢出,
2023-10-07 17:58:27,526 | INFO  | [Checkpoint Timer] | Triggering checkpoint 11 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1696672707523 for job f80d756ccb03ed8d6c19d114e0ba9e63. | org.apache.flink.runtime.checkpoint.CheckpointCoordinator.createPendingCheckpoint(CheckpointCoordinator.java:832)
2023-10-07 17:59:31,393 | INFO  | [flink-metrics-4] | No response from remote for outbound association. Handshake timed out after [60000 ms]. | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$3(Slf4jLogger.scala:96)
2023-10-07 17:59:31,396 | WARN  | [flink-metrics-4] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852]] Caused by: [No response from remote for outbound association. Handshake timed out after [60000 ms].] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
**2023-10-07 18:00:15,005 | INFO  | [flink-akka.actor.default-dispatcher-22] | LookupJoin[29] (3/3) (803c659fe1f2ef1c5886f73bab51a914) switched from RUNNING to FAILED on container_e03_1678102291469_2752_01_000006 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.lang.OutOfMemoryError: GC overhead limit exceeded**
	at org.apache.flink.core.memory.MemorySegmentFactory.wrap(MemorySegmentFactory.java:48) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.binary.BinaryStringData.fromBytes(BinaryStringData.java:95) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.StringData.fromBytes(StringData.java:59) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.columnar.ColumnarRowData.getString(ColumnarRowData.java:123) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$794/901417218.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$788/1820263644.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:107) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at LookupFunction$606.flatMap(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.fetchElements(LookupJoinRunner.java:90) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.lambda$processElement$0(LookupJoinRunner.java:76) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner$$Lambda$1805/947327052.run(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner.withRetry(AbstractLookupJoinRunner.java:104) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:71) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:31) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:40) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$743/1473507981.runDefaultAction(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task$$Lambda$1317/775704149.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2023-10-07 18:00:15,071 | INFO  | [flink-akka.actor.default-dispatcher-22] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
  • checkpoint失败
2023-10-07 15:03:42,678 | WARN  | [flink-akka.actor.default-dispatcher-36] | **Failed to trigger or complete checkpoint 6 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) |** org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2253 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
2254         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1909) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2255         at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2256         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1512) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2257         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1113) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2258         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1082) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2259         at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:590) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2260         at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:369) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2261         at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:345) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2262         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:328) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2263         at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:303) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2264         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2265         at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2266         at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2267         at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2268         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2269         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2270         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2271         at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2272         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2273         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2274         at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2275         at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
2276         at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
2277         at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
2278         at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2279         at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2280         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2281         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2282         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2283         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2284         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2285         at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2286         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2287         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2288         at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2289         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
2290         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
2291         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2292         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2293         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2294         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2295         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2296         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2297         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2298         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2299         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2300         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2301         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2302         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2303         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2304         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2305         at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2306         at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2307         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2308         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2309         at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2310         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2311         at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2312         at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2313         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
2314         at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
2315         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
2316         at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2317 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Remote connection to [null] failed with java.net.ConnectException: Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331 | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2318 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331]] Caused by: [java.net.ConnectException     : Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2319 2023-10-07 15:03:42,681 | INFO  | [SourceCoordinator-Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]] | Removing registered reader after failure for subtask 0 of source Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]. | org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$subtaskFailed$3(SourceCoordinator.java:267     )
2320 2023-10-07 15:03:42,681 | INFO  | [flink-akka.actor.default-dispatcher-36] | Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]
  • akka 超时
8 2023-10-07 15:02:01,864 | INFO  | [flink-scheduler-1] | Triggering Checkpoint 5 for job 0595b0727d9241894b541fd6e82af814 failed due to **java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1     jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout. |** org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:681)
2079 2023-10-07 15:02:01,865 | WARN  | [Checkpoint Timer] | Failed to trigger or complete checkpoint 5 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) | org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2080 org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
2081         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:691) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2082         at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_332]
2083         at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_332]
2084         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2085         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2086         at org.apache.flink.util.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:914) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2087         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2088         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2089         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2090         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2091         at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) ~[?:?]
2092         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2093         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2094         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2095         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2096         at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2097         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) ~[?:?]
2098         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[?:?]
2099         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[?:?]
2100         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2101         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2102         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2103         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2104         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) ~[?:?]
2105         at akka.dispatch.OnComplete.internal(Future.scala:299) ~[?:?]
2106         at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]
2107         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]
2108         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]
2109         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2110         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) ~[?:?]
2111         at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2112         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2113         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2114         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2115         at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:729) ~[?:?]
2116         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:479) ~[?:?]
2117         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.unbatchedExecute(SameThreadExecutionContext.scala:21) ~[?:?]
2118         at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[?:?]
2119         at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[?:?]
2120         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.execute(SameThreadExecutionContext.scala:20) ~[?:?]
2121         at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[?:?]
2122         at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[?:?]
2123         at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[?:?]
2124         at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[?:?]
2125         at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
2126 Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message      silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
2127         at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.triggerCheckpoint(RpcTaskManagerGateway.java:128) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2128         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpointHelper(Execution.java:854) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2129         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpoint(Execution.java:830) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2130         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerTasks(CheckpointCoordinator.java:745) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2131         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpointRequest(CheckpointCoordinator.java:678) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2132         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:645) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2133         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_332]
2134         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_332]
2135         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_332]
2136         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
2137         at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
2138         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_332]
2139         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_332]
2140         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
2141         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]

作业优化调整

初步优化

  • 根据上述3个原因,并且不存在数据倾斜的情况,适当增加内存,增加checkpoint超时/间隔时间,akka请求超时时间

-- 增加内存
set 'taskmanager.heap.size' = '5092MB';

-- 增加checkpoint间隔/超时时间
set 'execution.checkpointing.timeout'='60min';
set 'execution.checkpointing.interval' = '10 s';

-- 增加异步通信时间
set 'akka.ask.timeout' = '30 s';
  • 优化发现仍然未解决上述问题,再次查看flink-sql作业,发现仍然看到任务不断attempt,进一步分析发现flink-sql 数据流和外部维表进行join时耗时较久,将维表去除进行测试,发现反压很快消失确定问题方向后,进行针对性

二次优化

  • 根据发现的问题后,一是内存溢出,尽量减少维表数据量,根据业务需求、数据建模需要,只选择必须字段加工新的维表,减少join时的缓存数据量
如 维表 dim_a 有  col1,col2,col3,col4,col5,col6,确认只需要,col2,,col6,,则可以加工出 col2,col6的维表
  • 对指标计算,为减少数据计算,利用两阶聚合(先分桶和 group key聚合,再根据group key进行聚合)优势,minibatch(批次计算)优势,在牺牲较低延迟的基础,批次计算指标后,计算效率得到大幅提升,优化前 运行4d 40min 仍存在反压,优化后35min后,反压完全消失
  set 'table.exec.mini-batch.enabled'='true';
 set 'table.exec.mini-batch.allow-latency'='10 s';
 set 'table.exec.mini-batch.size'='5000';
 set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';

优化对比

  • 优化前

Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据文章来源地址https://www.toymoban.com/news/detail-740191.html

  • 优化后
    Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据
  • 从下图发现,经过两阶聚合后,左边经过mini-batch,两阶聚合优化后,处理数据量明显减少
    Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据

总结

  • 定位问题时,需要根据具体日志信息进行分析,结合flink运行原理和监控页面一步步定位出现问题蒜子
  • 参考文档 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tuning/
    Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据
    Flink-SQL join 优化 -- MiniBatch + local-global,flink,sql,大数据

到了这里,关于Flink-SQL join 优化 -- MiniBatch + local-global的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【flink-sql实战】flink 主键声明与upsert功能实战

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的 某个(些)列 是唯一的 并且不包含 Null 值 。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。 主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,

    2024年02月03日
    浏览(15)
  • flink-sql对kafka数据进行清洗过滤

    今天这篇blog主要记录使用flink-sql对kafka中的数据进行过滤。 以前对kafka数据进行实时处理时都是使用java来进行flink开发,需要创建一个工程,并且打成jar包再提交,流程固定但对于简单任务来说还是比较繁琐的。 今天我们要对logstash采集到kafka中的数据进行过滤筛选,将筛选

    2024年02月16日
    浏览(15)
  • 11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

    11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

    问题是来自于 群友, 2024.03.29, 也是花了一些时间 来排查这个问题  大致的问题是用 mysql-cdc 连接了一个 mysql-pxc 集群, 然后创建了一个 test_user 表  使用 \\\"select * from test_user\\\" 获取数据表的数据, 可以拿到 查询时的快照, 但是 无法获取到后续对于 test_user 表的增量操作的数据, 比如

    2024年04月15日
    浏览(18)
  • Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 详解

    Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 详解

    Flink ⽀持⾮常多的数据 Join ⽅式,主要包括以下三种: 动态表(流)与动态表(流)的 Join 动态表(流)与外部维表(⽐如 Redis)的 Join 动态表字段的列转⾏(⼀种特殊的 Join) 细分 Flink SQL ⽀持的 Join: Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join Interval Joi

    2024年02月04日
    浏览(14)
  • Flink SQL之Interval Joins

    Flink SQL之Interval Joins

    区间是双流join的优化,基于处理时间或事件时间,在一定时间区间内数据,相同的key进行join(支持 BatchStreaming)。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。 对于stream查询,时间区间join只支持有时间属性的 append-only表。由于时间属性是准单调递增的

    2024年02月09日
    浏览(9)
  • 使用 Alluxio 优化 EMR 上 Flink Join

    使用 Alluxio 优化 EMR 上 Flink Join

    业务背景痛点 流式处理的业务场景,经常会遇到实时消息数据需要与历史存量数据关联查询或者聚合,比如电商常见的订单场景,订单表做为实时事实表,是典型的流式消息数据,通常会在 kafka 中,而客户信息,商品 SKU 表是维度表,通常存在业务数据库或者数仓中,是典型

    2023年04月09日
    浏览(6)
  • 【大数据】Flink SQL 语法篇(六):Temporal Join

    《 Flink SQL 语法篇 》系列,共包含以下 10 篇文章: Flink SQL 语法篇(一):CREATE Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 语法篇(四):Group 聚合、Over 聚合 Flink SQL 语法篇(五):Regular Join、

    2024年03月15日
    浏览(37)
  • 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-2)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月08日
    浏览(12)
  • 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-1)

    27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月07日
    浏览(10)
  • flink1.15 维表join guava cache和mysql方面优化

    flink1.15 维表join guava cache和mysql方面优化

    优化前  mysql响应慢,导致算子中数据输出追不上输入,导致显示cpu busy:100% 优化后效果两个图对应两个时刻: - - 图中guava cache命中率是通过guava自带统计,打印出来的. 1 guava缓存数据量上限 = 类中配置的guava缓存数据上线 * task个数(即flink并行度) 缓存越久 命中率越高 数据越陈旧

    2024年01月17日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包