mirror of https://github.com/apache/nifi.git
NIFI-13568 MiNiFi Fix concurrent SAXParser issue in manifest parsing
Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com> This closes #9098.
This commit is contained in:
parent
b0f419be2c
commit
c31fb9d373
|
@ -74,7 +74,10 @@ public class C2OperationManager implements Runnable {
|
|||
@Override
|
||||
public void run() {
|
||||
processRestartState();
|
||||
processOperationsInLoop();
|
||||
}
|
||||
|
||||
private void processOperationsInLoop() {
|
||||
while (true) {
|
||||
C2Operation operation;
|
||||
try {
|
||||
|
@ -91,10 +94,17 @@ public class C2OperationManager implements Runnable {
|
|||
continue;
|
||||
}
|
||||
|
||||
C2OperationAck c2OperationAck = operationHandler.handle(operation);
|
||||
if (!requiresRestart(operationHandler, c2OperationAck)) {
|
||||
LOGGER.debug("No restart is required. Sending ACK to C2 server {}", c2OperationAck);
|
||||
sendAcknowledge(c2OperationAck);
|
||||
C2OperationAck operationAck;
|
||||
try {
|
||||
operationAck = operationHandler.handle(operation);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to process operation " + operation, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!requiresRestart(operationHandler, operationAck)) {
|
||||
LOGGER.debug("No restart is required. Sending ACK to C2 server {}", operationAck);
|
||||
sendAcknowledge(operationAck);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -109,8 +119,8 @@ public class C2OperationManager implements Runnable {
|
|||
try {
|
||||
C2OperationState failedState = restartState.get();
|
||||
LOGGER.debug("Restart handler returned with a failed state {}", failedState);
|
||||
c2OperationAck.setOperationState(failedState);
|
||||
sendAcknowledge(c2OperationAck);
|
||||
operationAck.setOperationState(failedState);
|
||||
sendAcknowledge(operationAck);
|
||||
} finally {
|
||||
operationQueueDAO.cleanup();
|
||||
LOGGER.debug("Heartbeats are enabled again");
|
||||
|
|
|
@ -271,7 +271,7 @@ public class C2NifiClientService {
|
|||
}
|
||||
}
|
||||
|
||||
private RuntimeInfoWrapper generateRuntimeInfo() {
|
||||
private synchronized RuntimeInfoWrapper generateRuntimeInfo() {
|
||||
AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest());
|
||||
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
|
||||
return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus());
|
||||
|
|
Loading…
Reference in New Issue