From 0df58cd78cc7df000664bf85d9c4330076a555cf Mon Sep 17 00:00:00 2001 From: logoutdhaval Date: Tue, 12 Mar 2024 18:05:36 +0530 Subject: [PATCH 1/4] [PHEE-617] Getting multiple callbacks on single batch transaction API call --- .../bulk/camel/routes/BatchAggregateRoute.java | 2 -- .../bulk/zeebe/worker/SendCallbackWorker.java | 14 +++++++++++++- src/main/resources/application.yaml | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java index 17b92c09..403913d2 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchAggregateRoute.java @@ -65,8 +65,6 @@ public void configure() throws Exception { } exchange.setProperty(COMPLETION_RATE, percentage); - - producerTemplate.send(RouteId.SEND_CALLBACK.getValue(), exchange); }).otherwise().log(LoggingLevel.ERROR, "Batch aggregate request unsuccessful").process(exchange -> { exchange.setProperty(BATCH_STATUS_FAILED, true); exchange.setProperty(ERROR_DESCRIPTION, exchange.getIn().getBody(String.class)); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java index a34df04b..723e64b2 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SendCallbackWorker.java @@ -1,16 +1,20 @@ package org.mifos.processor.bulk.zeebe.worker; import static org.mifos.processor.bulk.camel.config.CamelProperties.CALLBACK_RESPONSE_CODE; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CALLBACK_SUCCESS; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.CLIENT_CORRELATION_ID; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_RATE; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.COMPLETION_THRESHOLD; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_STATUS_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY; import java.util.Map; import org.apache.camel.Exchange; @@ -41,7 +45,15 @@ public void setup() { exchange.setProperty(COMPLETION_RATE, variables.get(COMPLETION_RATE)); exchange.setProperty(PHASES, variables.get(PHASES)); exchange.setProperty(PHASE_COUNT, variables.get(PHASE_COUNT)); - sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + exchange.setProperty(BATCH_ID, variables.get(BATCH_ID)); + exchange.setProperty(CLIENT_CORRELATION_ID, variables.get(CLIENT_CORRELATION_ID)); + Integer maxRetry = Integer.parseInt(variables.get(MAX_STATUS_RETRY).toString()); + Integer completionRate = Integer.parseInt(variables.get(COMPLETION_RATE).toString()); + Integer completionThreshold = Integer.parseInt(variables.get(COMPLETION_THRESHOLD).toString()); + Integer statusRetry = Integer.parseInt(variables.get(RETRY).toString()); + if (statusRetry >= maxRetry || completionRate >= completionThreshold) { + sendToCamelRoute(RouteId.SEND_CALLBACK, exchange); + } } Boolean callbackSuccess = exchange.getProperty(CALLBACK_SUCCESS, Boolean.class); if (callbackSuccess == null || !callbackSuccess) { diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index b2c65cb6..c4012d69 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -111,7 +111,7 @@ config: completion-threshold-check: enable: false completion-threshold: 95 # in percentage - max-retry: 15 #can be as high as 30 + max-retry: 3 #can be as high as 30 delay: 2 # in seconds deduplication: enabled: true From 1606c186d4f5a65bd72008b944903377e5d383fd Mon Sep 17 00:00:00 2001 From: logoutdhaval Date: Wed, 13 Mar 2024 15:37:48 +0530 Subject: [PATCH 2/4] [latest] error handle --- .../bulk/zeebe/worker/AggregateWorker.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java index 9bdd5d3f..d8aed3e7 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java @@ -47,19 +47,20 @@ public void setup() { sendToCamelRoute(RouteId.BATCH_AGGREGATE, exchange); Boolean batchStatusFailed = exchange.getProperty(BATCH_STATUS_FAILED, Boolean.class); - if (batchStatusFailed == null || !batchStatusFailed) { - if (exchange.getException() != null && exchange.getException().getMessage() != null - && exchange.getException().getMessage().contains("404")) { - logger.error("An error occurred, retrying"); - successRate = 0; - } else { - successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); - } - } else { + // if (batchStatusFailed == null || !batchStatusFailed) { + if (exchange.getException() != null && exchange.getException().getMessage() != null + && exchange.getException().getMessage().contains("404")) { + logger.error("An error occurred, retrying"); + successRate = 0; variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION)); logger.info("Error: {}, {}", variables.get(ERROR_CODE), variables.get(ERROR_DESCRIPTION)); + } else { + successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); } + // } else { + + // } variables.put(COMPLETION_RATE, successRate); variables.put(RETRY, ++retry); From 24996d4cd66b0580d6baac4c5d4a531c0ae866ae Mon Sep 17 00:00:00 2001 From: logoutdhaval Date: Wed, 13 Mar 2024 16:10:14 +0530 Subject: [PATCH 3/4] logger --- .../org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java index d8aed3e7..9cf9c243 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java @@ -54,8 +54,9 @@ public void setup() { successRate = 0; variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION)); - logger.info("Error: {}, {}", variables.get(ERROR_CODE), variables.get(ERROR_DESCRIPTION)); + logger.info("Error cause: {}, message: {}", exchange.getException().getCause(), exchange.getException().getMessage()); } else { + logger.info("BATCH SUCCESS "); successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); } // } else { From 17f196a6b7576b092e44add4560ebf3a73df6273 Mon Sep 17 00:00:00 2001 From: logoutdhaval Date: Wed, 13 Mar 2024 16:12:13 +0530 Subject: [PATCH 4/4] logger --- .../mifos/processor/bulk/zeebe/worker/AggregateWorker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java index 9cf9c243..4188735b 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/AggregateWorker.java @@ -9,6 +9,7 @@ import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_CODE; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.ERROR_DESCRIPTION; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_CALLBACK_RETRY; +import static org.mifos.processor.bulk.zeebe.ZeebeVariables.MAX_STATUS_RETRY; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASES; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PHASE_COUNT; import static org.mifos.processor.bulk.zeebe.ZeebeVariables.RETRY; @@ -54,9 +55,9 @@ public void setup() { successRate = 0; variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); variables.put(ERROR_DESCRIPTION, exchange.getProperty(ERROR_DESCRIPTION)); - logger.info("Error cause: {}, message: {}", exchange.getException().getCause(), exchange.getException().getMessage()); + logger.info("Retry: {} , Error cause: {}, message: {}",retry, exchange.getException().getCause(), exchange.getException().getMessage()); } else { - logger.info("BATCH SUCCESS "); + logger.info("BATCH SUCCESS retry: {} , and maxRetry: {}",retry,variables.get(MAX_STATUS_RETRY)); successRate = exchange.getProperty(COMPLETION_RATE, Long.class).intValue(); } // } else {