-
Notifications
You must be signed in to change notification settings - Fork 5k
[Fix-17817] [Master] Fix workflow timeout alerts failed #17819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
0b705ad
3de2674
876e3bd
dd87e65
e460464
58dd475
65e7d02
b123cb9
3057bf1
ee1a419
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; | ||
| import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; | ||
| import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent; | ||
| import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; | ||
| import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { | ||
|
|
||
| private final IWorkflowExecutionRunnable workflowExecutionRunnable; | ||
|
||
|
|
||
| protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, | ||
| final long timeout) { | ||
| super(timeout); | ||
| this.workflowExecutionRunnable = workflowExecutionRunnable; | ||
| } | ||
|
|
||
| @Override | ||
| public IWorkflowExecutionRunnable getWorkflowExecutionRunnable() { | ||
| return workflowExecutionRunnable; | ||
| } | ||
|
|
||
| public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) { | ||
| final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); | ||
| checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring."); | ||
|
|
||
| final int timeout = workflowInstance.getTimeout(); | ||
| checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout); | ||
|
|
||
| // Calculate remaining time until timeout: timeout - elapsed time | ||
| long delayTime = TimeUnit.MINUTES.toMillis(timeout) | ||
| - (System.currentTimeMillis() - workflowInstance.getStartTime().getTime()); | ||
| return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); | ||
| } | ||
|
|
||
| @Override | ||
| public ILifecycleEventType getEventType() { | ||
| return WorkflowLifecycleEventType.TIMEOUT; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "WorkflowTimeoutLifecycleEvent{" + | ||
| "workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() + | ||
| ", timeout=" + delayTime + | ||
| '}'; | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In our environment, we've already resolved the task timeout alerts and workflow timeout alerts. Here's how I determine whether a workflow has completed: final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In my opinion, If the workflow is already in a completed state in memory, does that mean timeout handling is no longer meaningful? |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,77 @@ | ||||||||||||
| /* | ||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||
| * | ||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
| * | ||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||
| * limitations under the License. | ||||||||||||
| */ | ||||||||||||
|
|
||||||||||||
| package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; | ||||||||||||
|
|
||||||||||||
| import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; | ||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; | ||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; | ||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; | ||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; | ||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; | ||||||||||||
| import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; | ||||||||||||
|
|
||||||||||||
| import lombok.extern.slf4j.Slf4j; | ||||||||||||
|
|
||||||||||||
| import org.springframework.stereotype.Component; | ||||||||||||
|
|
||||||||||||
| @Slf4j | ||||||||||||
| @Component | ||||||||||||
| public class WorkflowTimeoutLifecycleEventHandler | ||||||||||||
| extends | ||||||||||||
| AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> { | ||||||||||||
|
|
||||||||||||
| private final WorkflowAlertManager workflowAlertManager; | ||||||||||||
|
|
||||||||||||
| public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) { | ||||||||||||
| this.workflowAlertManager = workflowAlertManager; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Override | ||||||||||||
| public void handle(final IWorkflowStateAction workflowStateAction, | ||||||||||||
| final IWorkflowExecutionRunnable workflowExecutionRunnable, | ||||||||||||
| final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) { | ||||||||||||
| final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); | ||||||||||||
| final String workflowName = workflowExecutionRunnable.getName(); | ||||||||||||
|
|
||||||||||||
| // Check if workflow is still active (not finished) | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
| if (workflowInstance.getState().isFinalState()) { | ||||||||||||
| log.info("The workflow {} has been finished with state: {}, skip timeout alert.", | ||||||||||||
| workflowName, | ||||||||||||
| workflowInstance.getState().name()); | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Check if warning group is configured | ||||||||||||
| if (workflowInstance.getWarningGroupId() == null) { | ||||||||||||
| log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", workflowName); | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+59
to
+63
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
If the warningGroupIf is null, don't create event. |
||||||||||||
|
|
||||||||||||
| log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName); | ||||||||||||
| doWorkflowTimeoutAlert(workflowInstance); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) { | ||||||||||||
| workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Override | ||||||||||||
| public ILifecycleEventType matchEventType() { | ||||||||||||
| return WorkflowLifecycleEventType.TIMEOUT; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being equal to zero doesn't seem reasonable either.
final int timeout = workflowInstance.getTimeout();
checkState(timeout > 0, "The workflow timeout: %s must >0 minutes", timeout);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it shouldn't be 0, but I need to double-check.