Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package org.apache.dolphinscheduler.dao.repository.impl;

import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;

import java.util.Date;
import java.util.List;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -66,4 +72,49 @@ void testSendServerStoppedAlert() {
.count();
Assertions.assertEquals(1L, count);
}

@Test
void testSendWorkflowTimeoutAlert() {
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(1);
workflowInstance.setName("test-workflow-timeout");
workflowInstance.setWorkflowDefinitionCode(100L);
workflowInstance.setCommandType(CommandType.START_PROCESS);
workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
workflowInstance.setStartTime(new Date());
workflowInstance.setHost("localhost");
workflowInstance.setWarningGroupId(1);

ProjectUser projectUser = new ProjectUser();
projectUser.setProjectCode(1L);
projectUser.setProjectName("test-project");
projectUser.setUserName("admin");

alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser);

List<Alert> alerts = alertDao.listPendingAlerts(-1);
Assertions.assertNotNull(alerts);

long timeoutAlertCount = alerts.stream()
.filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType()))
.filter(alert -> alert.getWorkflowInstanceId() != null
&& alert.getWorkflowInstanceId().equals(workflowInstance.getId()))
.count();
Assertions.assertEquals(1L, timeoutAlertCount);

Alert timeoutAlert = alerts.stream()
.filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType()))
.filter(alert -> alert.getWorkflowInstanceId() != null
&& alert.getWorkflowInstanceId().equals(workflowInstance.getId()))
.findFirst()
.orElse(null);

Assertions.assertNotNull(timeoutAlert);
Assertions.assertEquals("Workflow Timeout Warn", timeoutAlert.getTitle());
Assertions.assertEquals(projectUser.getProjectCode(), timeoutAlert.getProjectCode());
Assertions.assertEquals(workflowInstance.getWorkflowDefinitionCode(),
timeoutAlert.getWorkflowDefinitionCode());
Assertions.assertEquals(workflowInstance.getId(), timeoutAlert.getWorkflowInstanceId());
Assertions.assertTrue(timeoutAlert.getContent().contains("test-workflow-timeout"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,9 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType {
* Finalize the workflow instance.
*/
FINALIZE,
/**
* The workflow instance timeout
*/
TIMEOUT,

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being equal to zero doesn't seem reasonable either.

final int timeout = workflowInstance.getTimeout();
checkState(timeout > 0, "The workflow timeout: %s must >0 minutes", timeout);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it shouldn't be 0, but I need to double-check.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;

import static com.google.common.base.Preconditions.checkState;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import java.util.concurrent.TimeUnit;

public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent {

private final IWorkflowExecutionRunnable workflowExecutionRunnable;

protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final long timeout) {
super(timeout);
this.workflowExecutionRunnable = workflowExecutionRunnable;
}

@Override
public IWorkflowExecutionRunnable getWorkflowExecutionRunnable() {
return workflowExecutionRunnable;
}

public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring.");

final int timeout = workflowInstance.getTimeout();
checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout);

// Calculate remaining time until timeout: timeout - elapsed time
long delayTime = TimeUnit.MINUTES.toMillis(timeout)
- (System.currentTimeMillis() - workflowInstance.getStartTime().getTime());
return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime);
}

@Override
public ILifecycleEventType getEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}

@Override
public String toString() {
return "WorkflowTimeoutLifecycleEvent{" +
"workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() +
", timeout=" + delayTime +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;

Expand All @@ -37,12 +39,25 @@ public class WorkflowStartLifecycleEventHandler
public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStartLifecycleEvent workflowStartEvent) {

workflowTimeoutMonitor(workflowExecutionRunnable);
workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.START;
}

private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
if (workflowInstance.getTimeout() <= 0) {
log.debug("The workflow {} timeout {} is not configured, skip timeout monitor.",
workflowInstance.getName(),
workflowInstance.getTimeout());
return;
}
workflowExecutionRunnable.getWorkflowEventBus()
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our environment, we've already resolved the task timeout alerts and workflow timeout alerts. Here's how I determine whether a workflow has completed:

final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
// all the TaskExecutionRunnable chain in the graph is finish, means the workflow is already finished.
return;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFinalState() reflects the persistent final state, making it more reliable. isAllTaskExecutionRunnableChainFinish(), on the other hand, only reflects the task completion state in memory; it may not yet have transitioned to the final workflow state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFinalState() reflects the persistent final state, making it more reliable. isAllTaskExecutionRunnableChainFinish(), on the other hand, only reflects the task completion state in memory; it may not yet have transitioned to the final workflow state.

In my opinion, If the workflow is already in a completed state in memory, does that mean timeout handling is no longer meaningful?

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WorkflowTimeoutLifecycleEventHandler
extends
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {

private final WorkflowAlertManager workflowAlertManager;

public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) {
this.workflowAlertManager = workflowAlertManager;
}

@Override
public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
final String workflowName = workflowExecutionRunnable.getName();

// Check if workflow is still active (not finished)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if workflow is still active (not finished)

if (workflowInstance.getState().isFinalState()) {
log.info("The workflow {} has been finished with state: {}, skip timeout alert.",
workflowName,
workflowInstance.getState().name());
return;
}

// Check if warning group is configured
if (workflowInstance.getWarningGroupId() == null) {
log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", workflowName);
return;
}
Comment on lines +59 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if warning group is configured
if (workflowInstance.getWarningGroupId() == null) {
log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", workflowName);
return;
}

If the warningGroupIf is null, don't create event.


log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName);
doWorkflowTimeoutAlert(workflowInstance);
}

private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) {
workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance);
}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}
}
Loading