diff --git a/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java b/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java
new file mode 100644
index 0000000000..aba5244060
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.context;
+
+/**
+ * A context for retrieving information about the state of the cluster.
+ */
+public interface ClusterContext {
+
+ /**
+ * Retrieves the current state of the cluster connection of this node.
+ *
+ * @return True if this node is connected to the cluster. False otherwise.
+ */
+ boolean isConnectedToCluster();
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
index ebabc6853e..866ac282d8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
@@ -29,11 +29,26 @@ import java.util.Set;
public interface NodeTypeProvider {
/**
- * @return true if this instance is clustered, false otherwise.
+ * @return true if this instance is clustered, false otherwise.MockProcessContext
* Clustered means that a node is either connected or trying to connect to the cluster.
*/
boolean isClustered();
+ /**
+ * @return true if the expected state of clustering is true, false otherwise. Contrary to {{@link #isClustered()}}
+ * this does not dynamically change with the state of this node.
+ */
+ default boolean isConfiguredForClustering() {
+ return false;
+ }
+
+ /**
+ * @return true if this instances is clustered and connected to the cluster.
+ */
+ default boolean isConnected() {
+ return false;
+ }
+
/**
* @return true if this instance is the primary node in the cluster; false otherwise
*/
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index 4ce6367d0f..9784384b8a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.ClusterContext;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.scheduling.ExecutionNode;
@@ -35,7 +36,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
* thread-safe.
*
*/
-public interface ProcessContext extends PropertyContext {
+public interface ProcessContext extends PropertyContext, ClusterContext {
/**
* Retrieves the current value set for the given descriptor, if a value is
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 6457e3bfe2..01098df75f 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -69,7 +69,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
private volatile Set unavailableRelationships = new HashSet<>();
private volatile boolean isClustered;
+ private volatile boolean isConfiguredForClustering;
private volatile boolean isPrimaryNode;
+ private volatile boolean isConnected = true;
public MockProcessContext(final ConfigurableComponent component) {
this(component, null);
@@ -524,6 +526,11 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
return isClustered;
}
+ @Override
+ public boolean isConfiguredForClustering() {
+ return isConfiguredForClustering;
+ }
+
@Override
public boolean isPrimary() {
return isPrimaryNode;
@@ -533,9 +540,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
isClustered = clustered;
}
+ public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
+ this.isConfiguredForClustering = isConfiguredForClustering;
+ }
+
public void setPrimaryNode(boolean primaryNode) {
- if (!isClustered && primaryNode) {
- throw new IllegalArgumentException("Primary node is only available in cluster. Use setClustered(true) first.");
+ if (!isConfiguredForClustering && primaryNode) {
+ throw new IllegalArgumentException("Primary node is only available in cluster. Use setIsConfiguredForClustering(true) first.");
}
isPrimaryNode = primaryNode;
}
@@ -544,4 +555,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
public InputRequirement getInputRequirement() {
return inputRequirement;
}
+
+ public void setConnected(boolean connected) {
+ isConnected = connected;
+ }
+
+ @Override
+ public boolean isConnectedToCluster() {
+ return isConnected;
+ }
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0ef2203541..6e8011fbbb 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -928,11 +928,21 @@ public class StandardProcessorTestRunner implements TestRunner {
context.setClustered(clustered);
}
+ @Override
+ public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
+ context.setIsConfiguredForClustering(isConfiguredForClustering);
+ }
+
@Override
public void setPrimaryNode(boolean primaryNode) {
context.setPrimaryNode(primaryNode);
}
+ @Override
+ public void setConnected(final boolean isConnected) {
+ context.setConnected(isConnected);
+ }
+
@Override
public String getVariableValue(final String name) {
Objects.requireNonNull(name);
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 176cc98a37..d3b144677e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -933,11 +933,22 @@ public interface TestRunner {
*/
void setClustered(boolean clustered);
+ /**
+ * @param isConfiguredForClustering Specify if this test emulates running in an environment where the expected
+ * cluster state equals with the argument.
+ */
+ void setIsConfiguredForClustering(boolean isConfiguredForClustering);
+
/**
* @param primaryNode Specify if this test emulates running as a primary node
*/
void setPrimaryNode(boolean primaryNode);
+ /**
+ * @param isConnected Specify if this test emulates ongoing cluster connection
+ */
+ void setConnected(boolean isConnected);
+
/**
* Sets the value of the variable with the given name to be the given value. This exposes the variable
* for use by the Expression Language.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 33ca57891c..9debcf2559 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -528,8 +528,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
- eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider,
- eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
+ eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
+ repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
@@ -2266,6 +2266,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}
+ @Override
public boolean isConfiguredForClustering() {
return configuredForClustering;
}
@@ -2815,6 +2816,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return resourceClaimManager;
}
+ @Override
public boolean isConnected() {
rwLock.readLock().lock();
try {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 6a579e9f80..74d5d4f8d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -78,7 +78,7 @@ public class StandardReloadComponent implements ReloadComponent {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
- flowController.getEncryptor(), stateManager, () -> false);
+ flowController.getEncryptor(), stateManager, () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
} finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 5300cb48ce..8d9a9ffbe2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -270,4 +270,9 @@ public class ConnectableProcessContext implements ProcessContext {
public String getName() {
return connectable.getName();
}
+
+ @Override
+ public boolean isConnectedToCluster() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 52dc89c4b9..87e2762d22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventBasedWorker;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
+import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
@@ -67,6 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final StringEncryptor encryptor;
private final ExtensionManager extensionManager;
+ private final NodeTypeProvider nodeTypeProvider;
private volatile String adminYieldDuration = "1 sec";
@@ -75,7 +77,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
- final StringEncryptor encryptor, final ExtensionManager extensionManager) {
+ final StringEncryptor encryptor, final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
@@ -84,6 +86,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.encryptor = encryptor;
this.extensionManager = extensionManager;
+ this.nodeTypeProvider = nodeTypeProvider;
for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
@@ -205,7 +208,8 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StateManager stateManager = new TaskTerminationAwareStateManager(getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
- final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated);
+ final StandardProcessContext standardProcessContext = new StandardProcessContext(
+ procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated, nodeTypeProvider);
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index e88b59fab8..508be541c1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -304,7 +304,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
final Supplier processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
- this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
+ this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
final CompletableFuture future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@@ -344,7 +344,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(procNode, false);
StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
- this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
+ this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
LOG.info("Stopping {}", procNode);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), lifecycleState);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 45d0c8f412..cd268eab29 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -82,7 +82,8 @@ public class ConnectableTask {
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
- processContext = new StandardProcessContext((ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated);
+ processContext = new StandardProcessContext(
+ (ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated, flowController);
} else {
processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index a0a972146c..ab942340a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -527,7 +527,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), node.getProcessor().getClass(), node.getIdentifier())) {
- final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false);
+ final StandardProcessContext processContext = new StandardProcessContext(
+ node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
}
}
@@ -993,7 +994,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getProcessor().getClass(), processor.getIdentifier())) {
- final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false);
+ final StandardProcessContext processContext = new StandardProcessContext(
+ processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index fd25256e7a..d7030efa15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -51,15 +52,17 @@ public class StandardProcessContext implements ProcessContext, ControllerService
private final StringEncryptor encryptor;
private final StateManager stateManager;
private final TaskTermination taskTermination;
+ private final NodeTypeProvider nodeTypeProvider;
private final Map properties;
- public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
- final TaskTermination taskTermination) {
+ public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
+ final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider) {
this.procNode = processorNode;
this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor;
this.stateManager = stateManager;
this.taskTermination = taskTermination;
+ this.nodeTypeProvider = nodeTypeProvider;
properties = Collections.unmodifiableMap(processorNode.getEffectivePropertyValues());
@@ -290,4 +293,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return procNode.getName();
}
+ @Override
+ public boolean isConnectedToCluster() {
+ return nodeTypeProvider.isConnected();
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index 0dd9cde198..0bf741aeea 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -142,7 +142,7 @@ public class TestStandardProcessorNode {
new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true);
- final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false);
+ final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@Override
public void onTaskComplete() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
index 6e47264e55..09556eb945 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
@@ -387,7 +387,7 @@ public class ParametersIT extends FrameworkIntegrationTest {
usernamePassword.setProperties(properties);
final ProcessContext processContext = new StandardProcessContext(usernamePassword, getFlowController().getControllerServiceProvider(), getFlowController().getEncryptor(),
- getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false);
+ getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false, getFlowController());
final PropertyDescriptor descriptor = usernamePassword.getPropertyDescriptor("password");
final PropertyValue propertyValue = processContext.getProperty(descriptor);
final PropertyValue evaluatedPropertyValue = propertyValue.evaluateAttributeExpressions();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
index d97cf1cc92..ebcf002a6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
@@ -124,4 +124,9 @@ public class MockProcessContext implements ProcessContext {
public String getName() {
return null;
}
+
+ @Override
+ public boolean isConnectedToCluster() {
+ return true;
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
index 1de89bce99..743dccc9a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
@@ -530,4 +530,9 @@ public class StatelessProcessContext implements ProcessContext, ControllerServic
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}
+
+ @Override
+ public boolean isConnectedToCluster() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index c7efc1f46c..1e2c44a1d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -16,20 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -57,10 +43,25 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
@SideEffectFree
@TriggerSerially
@TriggerWhenEmpty
@@ -168,6 +169,7 @@ public class MonitorActivity extends AbstractProcessor {
private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
private final AtomicBoolean inactive = new AtomicBoolean(false);
+ private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
@@ -218,9 +220,13 @@ public class MonitorActivity extends AbstractProcessor {
latestReportedNodeState.set(timestamp);
}
+ protected final long getLatestSuccessTransfer() {
+ return latestSuccessTransfer.get();
+ }
+
private boolean isClusterScope(final ProcessContext context, boolean logInvalidConfig) {
if (SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isClustered()) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
return true;
}
if (logInvalidConfig) {
@@ -248,9 +254,19 @@ public class MonitorActivity extends AbstractProcessor {
final ComponentLog logger = getLogger();
final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
final boolean isClusterScope = isClusterScope(context, false);
+ final boolean isConnectedToCluster = context.isConnectedToCluster();
final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
final List flowFiles = session.get(50);
+ if (isClusterScope(context, true)) {
+ if (isReconnectedToCluster(isConnectedToCluster)) {
+ reconcileState(context);
+ connectedWhenLastTriggered.set(true);
+ } else if (!isConnectedToCluster) {
+ connectedWhenLastTriggered.set(false);
+ }
+ }
+
boolean isInactive = false;
long updatedLatestSuccessTransfer = -1;
StateMap clusterState = null;
@@ -262,7 +278,7 @@ public class MonitorActivity extends AbstractProcessor {
isInactive = (now >= previousSuccessMillis + thresholdMillis);
logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
- if (isInactive && isClusterScope) {
+ if (isInactive && isClusterScope && isConnectedToCluster) {
// Even if this node has been inactive, there may be other nodes handling flow actively.
// However, if this node is active, we don't have to look at cluster state.
try {
@@ -286,7 +302,7 @@ public class MonitorActivity extends AbstractProcessor {
sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
}
- if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
+ if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
lastInactiveMessage.set(System.currentTimeMillis());
FlowFile inactiveFlowFile = session.create();
@@ -315,6 +331,7 @@ public class MonitorActivity extends AbstractProcessor {
final long latestStateReportTimestamp = latestReportedNodeState.get();
if (isClusterScope
+ && isConnectedToCluster
&& (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
// We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
try {
@@ -354,7 +371,7 @@ public class MonitorActivity extends AbstractProcessor {
if (updatedLatestSuccessTransfer > -1) {
latestSuccessTransfer.set(updatedLatestSuccessTransfer);
}
- if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
+ if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
FlowFile activityRestoredFlowFile = session.create();
if (copyAttributes) {
@@ -397,10 +414,42 @@ public class MonitorActivity extends AbstractProcessor {
}
}
- private boolean shouldThisNodeReport(boolean isClusterScope, boolean isReportOnlyOnPrimary) {
- return !isClusterScope
- || !isReportOnlyOnPrimary
- || getNodeTypeProvider().isPrimary();
+ /**
+ * Will return true when the last known state is "not connected" and the current state is "connected". This might
+ * happen when during last @OnTrigger the node was not connected but currently it is (reconnection); or when the
+ * processor is triggered first time (initial connection).
+ *
+ * This second case is due to safety reasons: it is possible that during the first trigger the node is not connected
+ * to the cluster thus the default value of the #connected attribute is false and stays as false until it's proven
+ * otherwise.
+ *
+ * @param isConnectedToCluster Current state of the connection.
+ *
+ * @return The node connected between the last trigger and the current one.
+ */
+ private boolean isReconnectedToCluster( final boolean isConnectedToCluster) {
+ return !connectedWhenLastTriggered.get() && isConnectedToCluster;
}
+ private void reconcileState(final ProcessContext context) {
+ try {
+ final StateMap state = context.getStateManager().getState(Scope.CLUSTER);
+ final Map newState = new HashMap<>();
+ newState.putAll(state.toMap());
+
+ final long validLastSuccessTransfer = StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
+ ? latestSuccessTransfer.get()
+ : Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)), latestSuccessTransfer.get());
+
+ newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(validLastSuccessTransfer));
+ context.getStateManager().replace(state, newState, Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Could not reconcile state after (re)connection! Reason: " + e.getMessage());
+ throw new ProcessException(e);
+ }
+ }
+
+ private boolean shouldThisNodeReport(final boolean isClusterScope, final boolean isReportOnlyOnPrimary, final ProcessContext context) {
+ return !isClusterScope || ((!isReportOnlyOnPrimary || getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index aa2d289f11..667ffea0ce 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -109,11 +111,205 @@ public class TestMonitorActivity {
restoredFlowFile.assertAttributeNotExists("key1");
}
+ @Test
+ public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
+ // given
+ final String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+ runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, lastSuccessInCluster), Scope.CLUSTER);
+
+ // when
+ runner.enqueue("lorem ipsum");
+ runner.run(1, false);
+
+ // then
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ }
+
+ @Test
+ public void testReconcileWhenSharedStateIsNotYetSet() throws Exception {
+ // given
+ final TestableProcessor processor = new TestableProcessor(0);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+
+ // when
+ runner.setConnected(false);
+ runner.enqueue("lorem ipsum");
+ runner.run(1, false, false);
+
+ // then
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ // when
+ runner.setConnected(true);
+ runner.run(1, false, false);
+
+ // then
+ final long tLocal = processor.getLatestSuccessTransfer();
+ final long tCluster = getLastSuccessFromCluster(runner);
+ assertEquals(tLocal, tCluster);
+ }
+
+ @Test
+ public void testReconcileAfterReconnectWhenPrimary() throws Exception {
+ // given
+ final TestableProcessor processor = new TestableProcessor(0);
+ final TestRunner runner = givenRunnerIsSetUpForReconcile(processor, true);
+
+ // when - First trigger will write last success transfer into cluster.
+ Thread.sleep(8);
+ runner.enqueue("lorem ipsum");
+ runNext(runner);
+ final long t1Local = processor.getLatestSuccessTransfer();
+ final long t1Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertEquals(t1Local, t1Cluster);
+ thenTransfersAre(runner, 1, 0, 0);
+
+ // when - At second trigger it's not connected, new last success transfer stored only locally.
+ Thread.sleep(20);
+ runner.setConnected(false);
+ runner.enqueue("lorem ipsum");
+ runNext(runner);
+ final long t2Local = processor.getLatestSuccessTransfer();
+ final long t2Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertNotEquals(t1Local, t2Local);
+ Assert.assertEquals(t1Local, t2Cluster);
+ thenTransfersAre(runner, 2, 0, 0);
+
+ // when - The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
+ Thread.sleep(20);
+ runner.setConnected(true);
+ runNext(runner);
+ final long t3Local = processor.getLatestSuccessTransfer();
+ final long t3Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertEquals(t3Local, t2Local);
+ Assert.assertEquals(t3Cluster, t2Local);
+ // Inactive message is being sent after the connection is back.
+ thenTransfersAre(runner,2, 1, 0);
+ }
+
+ @Test
+ public void testReconcileAfterReconnectWhenNotPrimary() throws Exception {
+ // given
+ final TestableProcessor processor = new TestableProcessor(0);
+ final TestRunner runner = givenRunnerIsSetUpForReconcile(processor, false);
+
+ // when - First trigger will write last success transfer into cluster.
+ Thread.sleep(8);
+ runner.enqueue("lorem ipsum");
+ runNext(runner);
+ final long t1Local = processor.getLatestSuccessTransfer();
+ final long t1Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertEquals(t1Local, t1Cluster);
+ thenTransfersAre(runner, 1, 0, 0);
+
+ // when - At second trigger it's not connected, new last success transfer stored only locally.
+ Thread.sleep(20);
+ runner.setConnected(false);
+ runner.enqueue("lorem ipsum");
+ runNext(runner);
+ final long t2Local = processor.getLatestSuccessTransfer();
+ final long t2Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertNotEquals(t1Local, t2Local);
+ Assert.assertEquals(t1Local, t2Cluster);
+ thenTransfersAre(runner, 2, 0, 0);
+
+ // when - The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
+ Thread.sleep(20);
+ runner.setConnected(true);
+ runNext(runner);
+ final long t3Local = processor.getLatestSuccessTransfer();
+ final long t3Cluster = getLastSuccessFromCluster(runner);
+
+ // then
+ Assert.assertEquals(t3Local, t2Local);
+ Assert.assertEquals(t3Cluster, t2Local);
+ // No inactive message because of the node is not primary
+ thenTransfersAre(runner, 2, 0, 0);
+ }
+
private void runNext(TestRunner runner) {
// Don't initialize, otherwise @OnScheduled is called and state gets reset
runner.run(1, false, false);
}
+ private TestRunner givenRunnerIsSetUpForReconcile(final TestableProcessor processor, final boolean isPrimary) {
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(isPrimary);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "10 millis");
+ return runner;
+ }
+
+ private Long getLastSuccessFromCluster(final TestRunner runner) throws IOException {
+ return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ }
+
+ private void thenTransfersAre(TestRunner runner, final int success, final int inactive, final int restored) {
+ if (success > 0 && inactive == 0 & restored == 0) {
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+ }
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, success);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, inactive);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, restored);
+ }
+
+ @Test
+ public void testNoReportingWhenDisconnected() {
+ // given
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(5)));
+
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "3 minutes");
+
+ // when
+ runner.setConnected(false);
+ runner.run(1, false);
+
+ // then
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+
+ // when
+ runner.setConnected(true);
+ runner.run(1, false);
+
+ // then
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ }
+
@Test
public void testFirstMessageWithInherit() throws InterruptedException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
@@ -258,7 +454,7 @@ public class TestMonitorActivity {
public void testClusterMonitorInvalidReportingNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -269,7 +465,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActive() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -291,7 +487,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(false);
+ runner.setIsConfiguredForClustering(false);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -311,7 +507,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -345,7 +541,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -379,7 +575,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActiveCopyAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -404,7 +600,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorInactivity() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -427,7 +623,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(false);
+ runner.setIsConfiguredForClustering(false);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -452,7 +648,7 @@ public class TestMonitorActivity {
final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -476,7 +672,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorInactivityOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -495,7 +691,8 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActivityRestoredBySelf() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
+
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -531,7 +728,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -567,7 +764,7 @@ public class TestMonitorActivity {
final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -606,7 +803,7 @@ public class TestMonitorActivity {
final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setClustered(false);
+ runner.setIsConfiguredForClustering(false);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -642,7 +839,7 @@ public class TestMonitorActivity {
public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -678,7 +875,7 @@ public class TestMonitorActivity {
final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -711,7 +908,7 @@ public class TestMonitorActivity {
@Test
public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
- runner.setClustered(true);
+ runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);