diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java index 85d9d2d9a0..88aa508a0b 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java @@ -74,7 +74,10 @@ public class C2OperationManager implements Runnable { @Override public void run() { processRestartState(); + processOperationsInLoop(); + } + private void processOperationsInLoop() { while (true) { C2Operation operation; try { @@ -91,10 +94,17 @@ public class C2OperationManager implements Runnable { continue; } - C2OperationAck c2OperationAck = operationHandler.handle(operation); - if (!requiresRestart(operationHandler, c2OperationAck)) { - LOGGER.debug("No restart is required. Sending ACK to C2 server {}", c2OperationAck); - sendAcknowledge(c2OperationAck); + C2OperationAck operationAck; + try { + operationAck = operationHandler.handle(operation); + } catch (Exception e) { + LOGGER.error("Failed to process operation " + operation, e); + continue; + } + + if (!requiresRestart(operationHandler, operationAck)) { + LOGGER.debug("No restart is required. Sending ACK to C2 server {}", operationAck); + sendAcknowledge(operationAck); continue; } @@ -109,8 +119,8 @@ public class C2OperationManager implements Runnable { try { C2OperationState failedState = restartState.get(); LOGGER.debug("Restart handler returned with a failed state {}", failedState); - c2OperationAck.setOperationState(failedState); - sendAcknowledge(c2OperationAck); + operationAck.setOperationState(failedState); + sendAcknowledge(operationAck); } finally { operationQueueDAO.cleanup(); LOGGER.debug("Heartbeats are enabled again"); diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 3528218520..677798976a 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -271,7 +271,7 @@ public class C2NifiClientService { } } - private RuntimeInfoWrapper generateRuntimeInfo() { + private synchronized RuntimeInfoWrapper generateRuntimeInfo() { AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest()); agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations()); return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus());