NIFI-13072: Fix MonitorActivity problems with cluster scope flow monitoring

This closes #8669.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Rajmund Takacs 2024-04-04 17:09:59 +02:00 committed by Peter Turcsanyi
parent 15696ad86a
commit bffacdec98
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 955 additions and 310 deletions

View File

@ -16,6 +16,20 @@
*/
package org.apache.nifi.processors.standard;
import static java.util.Collections.singletonMap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -30,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@ -44,23 +57,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@SideEffectFree
@TriggerSerially
@ -72,10 +69,18 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttributes({
@WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
@WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide." +
"If 'Copy Attribute' is set to true, then flow file attributes are also persisted.")
@Stateful(
scopes = { Scope.CLUSTER, Scope.LOCAL },
description = "MonitorActivity stores the last timestamp at each node as state, "
+ "so that it can examine activity at cluster wide. "
+ "If 'Copy Attribute' is set to true, then flow file attributes are also persisted. "
+ "In local scope, it stores last known activity timestamp if the flow is inactive."
)
public class MonitorActivity extends AbstractProcessor {
public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO = "CommonFlowActivityInfo.lastSuccessfulTransfer";
public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = "LocalFlowActivityInfo.lastSuccessfulTransfer";
public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
@ -113,6 +118,14 @@ public class MonitorActivity extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor RESET_STATE_ON_RESTART = new PropertyDescriptor.Builder()
.name("Reset State on Restart")
.description("When the processor gets started or restarted, if set to true, the initial state will always be active. "
+ "Otherwise, the last reported flow state will be preserved.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder()
.name("Inactivity Message")
.description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship")
@ -124,7 +137,7 @@ public class MonitorActivity extends AbstractProcessor {
public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Copy Attributes")
.description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file")
.required(false)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
@ -148,11 +161,7 @@ public class MonitorActivity extends AbstractProcessor {
" even if it's 'primary', NiFi act as 'all'.")
.required(true)
.allowableValues(REPORT_NODE_ALL, REPORT_NODE_PRIMARY)
.addValidator(((subject, input, context) -> {
boolean invalid = REPORT_NODE_PRIMARY.equals(input) && SCOPE_NODE.equals(context.getProperty(MONITORING_SCOPE).getValue());
return new ValidationResult.Builder().subject(subject).input(input)
.explanation("'" + REPORT_NODE_PRIMARY + "' is only available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
}))
.dependsOn(MONITORING_SCOPE, SCOPE_CLUSTER)
.defaultValue(REPORT_NODE_ALL.getValue())
.build();
@ -170,18 +179,16 @@ public class MonitorActivity extends AbstractProcessor {
.description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a "
+ "period of inactivity")
.build();
public static final Charset UTF8 = Charset.forName("UTF-8");
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
private final AtomicBoolean inactive = new AtomicBoolean(false);
private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
private final AtomicLong lastInactiveMessage = new AtomicLong();
private final AtomicLong inactivityStartMillis = new AtomicLong(System.currentTimeMillis());
private final AtomicBoolean wasActive = new AtomicBoolean(true);
private volatile LocalFlowActivityInfo localFlowActivityInfo;
@Override
protected void init(final ProcessorInitializationContext context) {
@ -191,6 +198,7 @@ public class MonitorActivity extends AbstractProcessor {
properties.add(INACTIVITY_MESSAGE);
properties.add(ACTIVITY_RESTORED_MESSAGE);
properties.add(WAIT_FOR_ACTIVITY);
properties.add(RESET_STATE_ON_RESTART);
properties.add(COPY_ATTRIBUTES);
properties.add(MONITORING_SCOPE);
properties.add(REPORTING_NODE);
@ -217,27 +225,191 @@ public class MonitorActivity extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
resetLastSuccessfulTransfer();
inactive.set(false);
hasSuccessTransfer.set(false);
final long thresholdMillis = context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
final boolean resetStateOnRestart = context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
// Attempt to load last state by the time of stopping this processor. A local state only exists if
// the monitored flow was already inactive, when the processor was shutting down.
final String storedLastSuccessfulTransfer = resetStateOnRestart ? null : tryLoadLastSuccessfulTransfer(context);
if (storedLastSuccessfulTransfer != null) {
// Initialize local flow as being inactive since the stored timestamp.
localFlowActivityInfo = new LocalFlowActivityInfo(
getStartupTime(), thresholdMillis, copyAttributes, Long.parseLong(storedLastSuccessfulTransfer));
wasActive.set(localFlowActivityInfo.isActive());
inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
} else {
// Initialize local flow as being active. If there is no traffic, then it will eventually become inactive.
localFlowActivityInfo = new LocalFlowActivityInfo(
getStartupTime(), thresholdMillis, copyAttributes);
wasActive.set(true);
}
}
protected void resetLastSuccessfulTransfer() {
setLastSuccessfulTransfer(System.currentTimeMillis());
@OnStopped
public void onStopped(final ProcessContext context) {
if (getNodeTypeProvider().isConfiguredForClustering() && context.isConnectedToCluster()) {
// Shared state needs to be cleared, in order to avoid getting inactive markers right after starting the
// flow after a weekend stop. In single-node setup, there is no shared state to be cleared, but the line
// below would also wipe out the local state. Hence, the check.
final StateManager stateManager = context.getStateManager();
try {
stateManager.clear(Scope.CLUSTER);
} catch (IOException e) {
getLogger().error("Failed to clear cluster state" + e, e);
}
}
}
protected final void setLastSuccessfulTransfer(final long timestamp) {
latestSuccessTransfer.set(timestamp);
latestReportedNodeState.set(timestamp);
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ComponentLog logger = getLogger();
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
final boolean wasActive = this.wasActive.get();
final List<FlowFile> flowFiles = session.get(50);
if (!flowFiles.isEmpty()) {
final boolean firstKnownTransfer = !localFlowActivityInfo.hasSuccessfulTransfer();
final boolean flowStateMustBecomeActive = !wasActive || firstKnownTransfer;
localFlowActivityInfo.update(flowFiles.get(0));
if (isClusterScope && flowStateMustBecomeActive) {
localFlowActivityInfo.forceSync();
}
session.transfer(flowFiles, REL_SUCCESS);
logger.info("Transferred {} FlowFiles to 'success'", flowFiles.size());
} else {
context.yield();
}
if (isClusterScope) {
if (!wasActive || !localFlowActivityInfo.isActive()) {
localFlowActivityInfo.forceSync();
}
synchronizeState(context);
}
final long thresholdMillis = context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final boolean continuallySendMessages = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
final boolean waitForActivity = context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
final boolean isActive = localFlowActivityInfo.isActive() || !flowFiles.isEmpty();
final long lastActivity = localFlowActivityInfo.getLastActivity();
final long inactivityStartMillis = this.inactivityStartMillis.get();
final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() + thresholdMillis) <= System.currentTimeMillis();
final boolean canReport = !isClusterScope || isConnectedToCluster || !flowFiles.isEmpty();
final boolean canChangeState = !waitForActivity || localFlowActivityInfo.hasSuccessfulTransfer();
if (canReport && canChangeState) {
if (isActive) {
onTriggerActiveFlow(context, session, wasActive, isClusterScope, inactivityStartMillis);
} else if (wasActive || continuallySendMessages && timeToRepeatInactiveMessage) {
onTriggerInactiveFlow(context, session, isClusterScope, lastActivity);
}
this.wasActive.set(isActive);
this.inactivityStartMillis.set(lastActivity);
} else {
// We need to block state transition, because we are not connected to the cluster.
// When we reconnect, and the state persists, then the next onTrigger will do the transition.
logger.trace("State transition is blocked, because we are not connected to the cluster.");
}
}
protected final long getLatestSuccessTransfer() {
return latestSuccessTransfer.get();
protected long getStartupTime() {
return System.currentTimeMillis();
}
protected final long getLastSuccessfulTransfer() {
return localFlowActivityInfo.getLastSuccessfulTransfer();
}
private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
final StateManager stateManager = context.getStateManager();
try {
final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
} catch (IOException e) {
throw new ProcessException("Failed to load local state due to " + e, e);
}
}
private void synchronizeState(ProcessContext context) {
final ComponentLog logger = getLogger();
final boolean isConnectedToCluster = context.isConnectedToCluster();
if (isReconnectedToCluster(isConnectedToCluster)) {
localFlowActivityInfo.forceSync();
connectedWhenLastTriggered.set(true);
}
if (!isConnectedToCluster) {
connectedWhenLastTriggered.set(false);
} else if (localFlowActivityInfo.syncNeeded()) {
final CommonFlowActivityInfo commonFlowActivityInfo = new CommonFlowActivityInfo(context);
localFlowActivityInfo.update(commonFlowActivityInfo);
try {
commonFlowActivityInfo.update(localFlowActivityInfo);
localFlowActivityInfo.setNextSyncMillis();
} catch (final SaveSharedFlowStateException ex) {
logger.debug("Failed to update common state.", ex);
}
}
}
private void onTriggerInactiveFlow(ProcessContext context, ProcessSession session, boolean isClusterScope, long lastActivity) {
final ComponentLog logger = getLogger();
final boolean shouldThisNodeReport = shouldThisNodeReport(isClusterScope, context);
if (shouldThisNodeReport) {
sendInactivityMarker(context, session, lastActivity, logger);
}
lastInactiveMessage.set(System.currentTimeMillis());
setInactivityFlag(context.getStateManager());
}
private void onTriggerActiveFlow(ProcessContext context, ProcessSession session, boolean wasActive, boolean isClusterScope,
long inactivityStartMillis) {
final ComponentLog logger = getLogger();
final boolean shouldThisNodeReport = shouldThisNodeReport(isClusterScope, context);
if (!wasActive) {
if (shouldThisNodeReport) {
final Map<String, String> attributes = localFlowActivityInfo.getLastSuccessfulTransferAttributes();
sendActivationMarker(context, session, attributes, inactivityStartMillis, logger);
}
clearInactivityFlag(context.getStateManager());
}
}
private void setInactivityFlag(StateManager stateManager) {
try {
stateManager.setState(singletonMap(
STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
String.valueOf(localFlowActivityInfo.getLastActivity())
), Scope.LOCAL);
} catch (IOException e) {
getLogger().error("Failed to set local state due to " + e, e);
}
}
private void clearInactivityFlag(StateManager stateManager) {
try {
stateManager.clear(Scope.LOCAL);
} catch (IOException e) {
throw new ProcessException("Failed to clear local state due to " + e, e);
}
}
private boolean isClusterScope(final ProcessContext context, boolean logInvalidConfig) {
if (SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
if (SCOPE_CLUSTER.getValue().equals(context.getProperty(MONITORING_SCOPE).getValue())) {
if (getNodeTypeProvider().isConfiguredForClustering()) {
return true;
}
@ -250,191 +422,17 @@ public class MonitorActivity extends AbstractProcessor {
}
private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final ProcessContext context) {
if (REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
if (isClusterScope) {
return true;
}
if (REPORT_NODE_PRIMARY.getValue().equals(context.getProperty(REPORTING_NODE).getValue())) {
return isClusterScope;
}
return false;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final long thresholdMillis = context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long now = System.currentTimeMillis();
final ComponentLog logger = getLogger();
final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
final boolean waitForActivity = context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
final List<FlowFile> flowFiles = session.get(50);
if (isClusterScope(context, true)) {
if (isReconnectedToCluster(isConnectedToCluster)) {
reconcileState(context);
connectedWhenLastTriggered.set(true);
} else if (!isConnectedToCluster) {
connectedWhenLastTriggered.set(false);
}
}
boolean isInactive = false;
long updatedLatestSuccessTransfer = -1;
StateMap clusterState = null;
if (flowFiles.isEmpty()) {
final long previousSuccessMillis = latestSuccessTransfer.get();
boolean sendInactiveMarker = false;
isInactive = (now >= previousSuccessMillis + thresholdMillis);
logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
if (isInactive && isClusterScope && isConnectedToCluster) {
// Even if this node has been inactive, there may be other nodes handling flow actively.
// However, if this node is active, we don't have to look at cluster state.
try {
clusterState = session.getState(Scope.CLUSTER);
if (clusterState != null && !StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
final long latestReportedClusterActivity = Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
isInactive = (now >= latestReportedClusterActivity + thresholdMillis);
if (!isInactive) {
// This node has been inactive, but other node has more recent activity.
updatedLatestSuccessTransfer = latestReportedClusterActivity;
}
logger.debug("isInactive={}, latestReportedClusterActivity={}", new Object[]{isInactive, latestReportedClusterActivity});
}
} catch (IOException e) {
logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
}
}
if (isInactive) {
final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
if (waitForActivity) {
sendInactiveMarker = sendInactiveMarker && hasSuccessTransfer.get();
}
}
if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
lastInactiveMessage.set(System.currentTimeMillis());
FlowFile inactiveFlowFile = session.create();
inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityStartMillis", String.valueOf(previousSuccessMillis));
inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
inactiveFlowFile = session.write(inactiveFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(outBytes);
}
});
session.getProvenanceReporter().create(inactiveFlowFile);
session.transfer(inactiveFlowFile, REL_INACTIVE);
logger.info("Transferred {} to 'inactive'", new Object[]{inactiveFlowFile});
} else {
context.yield(); // no need to dominate CPU checking times; let other processors run for a bit.
}
} else {
session.transfer(flowFiles, REL_SUCCESS);
hasSuccessTransfer.set(true);
updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
final long latestStateReportTimestamp = latestReportedNodeState.get();
if (isClusterScope
&& isConnectedToCluster
&& (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
// We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
try {
final StateMap state = session.getState(Scope.CLUSTER);
final Map<String, String> newValues = new HashMap<>();
// Persist attributes so that other nodes can copy it
if (copyAttributes) {
newValues.putAll(flowFiles.get(0).getAttributes());
}
newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
if (state == null || !state.getStateVersion().isPresent()) {
session.setState(newValues, Scope.CLUSTER);
} else {
final String existingTimestamp = state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
if (StringUtils.isEmpty(existingTimestamp)
|| Long.parseLong(existingTimestamp) < now) {
// If this returns false due to race condition, it's not a problem since we just need
// the latest active timestamp.
session.replaceState(state, newValues, Scope.CLUSTER);
} else {
logger.debug("Existing state has more recent timestamp, didn't update state.");
}
}
latestReportedNodeState.set(now);
} catch (IOException e) {
logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
}
}
}
if (!isInactive) {
final long inactivityStartMillis = latestSuccessTransfer.get();
if (updatedLatestSuccessTransfer > -1) {
latestSuccessTransfer.set(updatedLatestSuccessTransfer);
}
if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
FlowFile activityRestoredFlowFile = session.create();
if (copyAttributes) {
final Map<String, String> attributes = new HashMap<>();
if (flowFiles.size() > 0) {
// copy attributes from the first flow file in the list
attributes.putAll(flowFiles.get(0).getAttributes());
} else if (clusterState != null) {
attributes.putAll(clusterState.toMap());
attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
}
// don't copy the UUID
attributes.remove(CoreAttributes.UUID.key());
activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
}
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis));
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis));
final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
session.getProvenanceReporter().create(activityRestoredFlowFile);
session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
logger.info("Transferred {} to 'activity.restored'", new Object[]{activityRestoredFlowFile});
}
}
}
@OnStopped
public void onStopped(final ProcessContext context) {
if (getNodeTypeProvider().isPrimary()) {
final StateManager stateManager = context.getStateManager();
try {
stateManager.clear(Scope.CLUSTER);
} catch (IOException e) {
getLogger().error("Failed to clear cluster state due to " + e, e);
}
}
}
/**
* Will return true when the last known state is "not connected" and the current state is "connected". This might
* happen when during last @OnTrigger the node was not connected but currently it is (reconnection); or when the
* happen when during last @OnTrigger the node was not connected, but currently it is (reconnection); or when the
* processor is triggered first time (initial connection).
*
* <br />
* This second case is due to safety reasons: it is possible that during the first trigger the node is not connected
* to the cluster thus the default value of the #connected attribute is false and stays as false until it's proven
* otherwise.
@ -447,25 +445,203 @@ public class MonitorActivity extends AbstractProcessor {
return !connectedWhenLastTriggered.get() && isConnectedToCluster;
}
private void reconcileState(final ProcessContext context) {
try {
final StateMap state = context.getStateManager().getState(Scope.CLUSTER);
final Map<String, String> newState = new HashMap<>();
newState.putAll(state.toMap());
private boolean shouldThisNodeReport(final boolean isClusterScope, final ProcessContext context) {
final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
return !isClusterScope || (!shouldReportOnlyOnPrimary || getNodeTypeProvider().isPrimary());
}
final long validLastSuccessTransfer = StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
? latestSuccessTransfer.get()
: Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)), latestSuccessTransfer.get());
private void sendInactivityMarker(ProcessContext context, ProcessSession session, long inactivityStartMillis,
ComponentLog logger) {
FlowFile inactiveFlowFile = session.create();
inactiveFlowFile = session.putAttribute(
inactiveFlowFile,
"inactivityStartMillis", String.valueOf(inactivityStartMillis)
);
inactiveFlowFile = session.putAttribute(
inactiveFlowFile,
"inactivityDurationMillis",
String.valueOf(System.currentTimeMillis() - inactivityStartMillis)
);
newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(validLastSuccessTransfer));
context.getStateManager().replace(state, newState, Scope.CLUSTER);
} catch (IOException e) {
getLogger().error("Could not reconcile state after (re)connection! Reason: " + e.getMessage());
throw new ProcessException(e);
final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
StandardCharsets.UTF_8);
inactiveFlowFile = session.write(inactiveFlowFile, out -> out.write(outBytes));
session.getProvenanceReporter().create(inactiveFlowFile);
session.transfer(inactiveFlowFile, REL_INACTIVE);
logger.info("Transferred {} to 'inactive'", inactiveFlowFile);
}
private void sendActivationMarker(ProcessContext context, ProcessSession session, Map<String, String> attributes,
long inactivityStartMillis, ComponentLog logger) {
FlowFile activityRestoredFlowFile = session.create();
// don't copy the UUID
attributes.remove(CoreAttributes.UUID.key());
activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(
inactivityStartMillis));
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
StandardCharsets.UTF_8);
activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
session.getProvenanceReporter().create(activityRestoredFlowFile);
session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
logger.info("Transferred {} to 'activity.restored'", activityRestoredFlowFile);
}
private static class LocalFlowActivityInfo {
private static final long NO_VALUE = 0;
private final long startupTimeMillis;
private final long thresholdMillis;
private final boolean saveAttributes;
private long nextSyncMillis = NO_VALUE;
private long lastSuccessfulTransfer = NO_VALUE;
private Map<String, String> lastSuccessfulTransferAttributes = new HashMap<>();
public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes) {
this.startupTimeMillis = startupTimeMillis;
this.thresholdMillis = thresholdMillis;
this.saveAttributes = saveAttributes;
}
public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
this(startupTimeMillis, thresholdMillis, saveAttributes);
lastSuccessfulTransfer = initialLastSuccessfulTransfer;
}
public boolean syncNeeded() {
return nextSyncMillis <= System.currentTimeMillis();
}
public void setNextSyncMillis() {
nextSyncMillis = System.currentTimeMillis() + (thresholdMillis / 3);
}
public void forceSync() {
nextSyncMillis = System.currentTimeMillis();
}
public boolean isActive() {
if (hasSuccessfulTransfer()) {
return System.currentTimeMillis() < (lastSuccessfulTransfer + thresholdMillis);
} else {
return System.currentTimeMillis() < (startupTimeMillis + thresholdMillis);
}
}
public boolean hasSuccessfulTransfer() {
return lastSuccessfulTransfer != NO_VALUE;
}
public long getLastSuccessfulTransfer() {
return lastSuccessfulTransfer;
}
public long getLastActivity() {
if (hasSuccessfulTransfer()) {
return lastSuccessfulTransfer;
} else {
return startupTimeMillis;
}
}
public Map<String, String> getLastSuccessfulTransferAttributes() {
return lastSuccessfulTransferAttributes;
}
public void update(FlowFile flowFile) {
this.lastSuccessfulTransfer = System.currentTimeMillis();
if (saveAttributes) {
lastSuccessfulTransferAttributes = new HashMap<>(flowFile.getAttributes());
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
}
}
public void update(CommonFlowActivityInfo commonFlowActivityInfo) {
if (!commonFlowActivityInfo.hasSuccessfulTransfer()) {
return;
}
final long lastSuccessfulTransfer = commonFlowActivityInfo.getLastSuccessfulTransfer();
if (lastSuccessfulTransfer <= getLastSuccessfulTransfer()) {
return;
}
this.lastSuccessfulTransfer = lastSuccessfulTransfer;
if (saveAttributes) {
lastSuccessfulTransferAttributes = commonFlowActivityInfo.getLastSuccessfulTransferAttributes();
}
}
}
private boolean shouldThisNodeReport(final boolean isClusterScope, final boolean isReportOnlyOnPrimary, final ProcessContext context) {
return !isClusterScope || ((!isReportOnlyOnPrimary || getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
private static class CommonFlowActivityInfo {
private final StateManager stateManager;
private final StateMap storedState;
private final Map<String, String> newState = new HashMap<>();
public CommonFlowActivityInfo(ProcessContext context) {
this.stateManager = context.getStateManager();
try {
storedState = stateManager.getState(Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("Cannot load common flow activity info.", e);
}
}
public boolean hasSuccessfulTransfer() {
return storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO) != null;
}
public long getLastSuccessfulTransfer() {
return Long.parseLong(storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
public Map<String, String> getLastSuccessfulTransferAttributes() {
final Map<String, String> result = new HashMap<>(storedState.toMap());
result.remove(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
return result;
}
public void update(LocalFlowActivityInfo localFlowActivityInfo) {
if (!localFlowActivityInfo.hasSuccessfulTransfer()) {
return;
}
final long lastSuccessfulTransfer = localFlowActivityInfo.getLastSuccessfulTransfer();
if (hasSuccessfulTransfer() && (lastSuccessfulTransfer <= getLastSuccessfulTransfer())) {
return;
}
newState.putAll(localFlowActivityInfo.getLastSuccessfulTransferAttributes());
newState.put(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(lastSuccessfulTransfer));
final boolean wasSuccessful;
try {
wasSuccessful = stateManager.replace(storedState, newState, Scope.CLUSTER);
} catch (IOException e) {
throw new SaveSharedFlowStateException("Caught exception while saving state.", e);
}
if (!wasSuccessful) {
throw new SaveSharedFlowStateException("Failed to save state. Probably there was a concurrent update.");
}
}
}
private static class SaveSharedFlowStateException extends ProcessException {
public SaveSharedFlowStateException(String message) {
super(message);
}
public SaveSharedFlowStateException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@ -16,15 +16,23 @@
*/
package org.apache.nifi.processors.standard;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
@ -33,16 +41,10 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMonitorActivity {
@Test
public void testFirstMessage() {
public void testFirstMessage() throws InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@ -53,7 +55,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@ -78,7 +80,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@ -101,19 +103,24 @@ public class TestMonitorActivity {
}
@Test
public void testFirstMessageWithWaitForActivityTrue() {
public void testFirstMessageWithWaitForActivityTrue() throws InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true");
runner.run(1, false);
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.enqueue(new byte[0]);
runner.run();
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@ -136,7 +143,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@ -157,7 +164,7 @@ public class TestMonitorActivity {
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
final String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final String lastSuccessInCluster = String.valueOf(currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
@ -165,14 +172,175 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, lastSuccessInCluster), Scope.CLUSTER);
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.enqueue("lorem ipsum");
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInput() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.run(1, false);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputButClusterIsActive() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis());
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.run(1, false);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasInactiveLastTime() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.LOCAL
);
runner.run(1, false);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
final StateMap updatedLocalState = runner.getStateManager().getState(Scope.LOCAL);
assertEquals(lastSuccessInCluster, updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasActiveLastTime() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis());
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
// if was active, there is no local state
runner.run(1, false);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedInactiveSinceLastTime() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
// if was active, there is no local state
runner.run(1, false);
final StateMap updatedClusterState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedClusterState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
final StateMap updatedLocalState = runner.getStateManager().getState(Scope.LOCAL);
assertEquals(lastSuccessInCluster, updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedActiveSinceLastTime() throws Exception {
final String lastSuccessInCluster = String.valueOf(currentTimeMillis());
final String lastSuccessInLocal = String.valueOf(currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, lastSuccessInCluster), Scope.CLUSTER);
runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
runner.getStateManager().setState(
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, lastSuccessInLocal), Scope.LOCAL
);
runner.run(1, false);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertNull(runner.getStateManager().getState(Scope.LOCAL).get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
@ -188,33 +356,41 @@ public class TestMonitorActivity {
runner.setConnected(false);
runner.enqueue("lorem ipsum");
runner.run(1, false, false);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
runner.setConnected(true);
runner.run(1, false, false);
runNext(runner);
final long tLocal = processor.getLatestSuccessTransfer();
final long tLocal = processor.getLastSuccessfulTransfer();
final long tCluster = getLastSuccessFromCluster(runner);
assertEquals(tLocal, tCluster);
}
@Test
public void testReconcileAfterReconnectWhenPrimary() throws InterruptedException {
public void testReconcileAfterReconnectWhenPrimary() throws InterruptedException, IOException {
final TestRunner runner = getRunnerScopeCluster(new MonitorActivity(), true);
final StateManager stateManager = runner.getStateManager();
// First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
runNext(runner);
runner.run(1, false);
final String lastSuccessTransferAfterFirstTrigger = stateManager.getState(Scope.CLUSTER)
.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertTransferCountSuccessInactiveRestored(runner, 1, 0);
// At second trigger it's not connected, new last success transfer stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to guarantee, that the stored timestamp will be different.
runNext(runner);
assertEquals(lastSuccessTransferAfterFirstTrigger,
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
// The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
@ -222,32 +398,46 @@ public class TestMonitorActivity {
TimeUnit.MILLISECONDS.sleep(500);
runNext(runner);
assertNotEquals(lastSuccessTransferAfterFirstTrigger,
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// Inactive message is being sent after the connection is back.
assertTransferCountSuccessInactiveRestored(runner,2, 1);
}
@Test
public void testReconcileAfterReconnectWhenNotPrimary() {
public void testReconcileAfterReconnectWhenNotPrimary() throws IOException, InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = getRunnerScopeCluster(processor, false);
final StateManager stateManager = runner.getStateManager();
// First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
runNext(runner);
runner.run(1, false);
final String lastSuccessTransferAfterFirstTrigger = stateManager.getState(Scope.CLUSTER)
.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertTransferCountSuccessInactiveRestored(runner, 1, 0);
// At second trigger it's not connected, new last success transfer stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to guarantee, that the stored timestamp will be different.
runNext(runner);
assertEquals(lastSuccessTransferAfterFirstTrigger,
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
// The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
runner.setConnected(true);
runNext(runner);
assertNotEquals(lastSuccessTransferAfterFirstTrigger,
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// No inactive message because of the node is not primary
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
}
@ -269,7 +459,7 @@ public class TestMonitorActivity {
}
private Long getLastSuccessFromCluster(final TestRunner runner) throws IOException {
return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
private void assertTransferCountSuccessInactiveRestored(TestRunner runner, final int success, final int inactive) {
@ -303,7 +493,7 @@ public class TestMonitorActivity {
}
@Test
public void testFirstMessageWithInherit() {
public void testFirstMessageWithInherit() throws InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@ -311,12 +501,12 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
runner.enqueue(new byte[0]);
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
runner.clearTransferState();
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@ -346,7 +536,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
processor.resetLastSuccessfulTransfer();
TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@ -383,7 +573,7 @@ public class TestMonitorActivity {
rerun = false;
runner.setProperty(MonitorActivity.THRESHOLD, threshold + " millis");
Thread.sleep(1000L);
TimeUnit.MILLISECONDS.sleep(1000L);
// shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer if @OnSchedule & onTrigger
// does not get called more than MonitorActivity.THRESHOLD apart
@ -410,30 +600,18 @@ public class TestMonitorActivity {
*/
private static class TestableProcessor extends MonitorActivity {
private final long timestampDifference;
private final long startupTime;
public TestableProcessor(final long timestampDifference) {
this.timestampDifference = timestampDifference;
this.startupTime = currentTimeMillis() - timestampDifference;
}
@Override
public void resetLastSuccessfulTransfer() {
setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference);
protected long getStartupTime() {
return startupTime;
}
}
@Test
public void testClusterMonitorInvalidReportingNode() {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.assertNotValid();
}
@Test
public void testClusterMonitorActive() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
@ -445,12 +623,12 @@ public class TestMonitorActivity {
runner.enqueue("Incoming data");
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// Should be null because COPY_ATTRIBUTES is null.
assertNull(updatedState.get("key1"));
assertNull(updatedState.get("key2"));
@ -472,7 +650,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
@Test
@ -488,21 +666,22 @@ public class TestMonitorActivity {
// Set future timestamp in state
final HashMap<String, String> existingState = new HashMap<>();
final long existingTimestamp = System.currentTimeMillis() - 1_000;
existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
final long existingTimestamp = currentTimeMillis() - 1_000;
existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(existingTimestamp));
existingState.put("key1", "value1");
existingState.put("key2", "value2");
runner.getStateManager().setState(existingState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
assertTrue( existingTimestamp < Long.parseLong(postProcessedState.get(
MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
long postProcessedTimestamp = Long.parseLong(postProcessedState.get(
MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertTrue(existingTimestamp < postProcessedTimestamp);
// State should be updated. Null in this case.
assertNull(postProcessedState.get("key1"));
assertNull(postProcessedState.get("key2"));
@ -521,22 +700,22 @@ public class TestMonitorActivity {
// Set future timestamp in state
final HashMap<String, String> existingState = new HashMap<>();
final long existingTimestamp = System.currentTimeMillis() + 10_000;
existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
final long existingTimestamp = currentTimeMillis() + 10_000;
existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(existingTimestamp));
existingState.put("key1", "value1");
existingState.put("key2", "value2");
runner.getStateManager().setState(existingState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(
String.valueOf(existingTimestamp),
postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
postProcessedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// State should stay the same.
assertEquals(postProcessedState.get("key1"), existingState.get("key1"));
assertEquals(postProcessedState.get("key2"), existingState.get("key2"));
@ -557,12 +736,12 @@ public class TestMonitorActivity {
attributes.put("key2", "value2");
runner.enqueue("Incoming data", attributes);
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
}
@ -689,7 +868,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@ -723,7 +902,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@ -762,7 +941,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@ -801,7 +980,7 @@ public class TestMonitorActivity {
// Latest activity should NOT be persisted
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.clearTransferState();
}
@ -812,22 +991,25 @@ public class TestMonitorActivity {
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Activity restored, even if this node doesn't have activity, other node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
// Common state is not sampled on each trigger. We need to wait a little to get notified about the update.
TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
runNext(runner);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@ -849,22 +1031,25 @@ public class TestMonitorActivity {
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive
runner.run();
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Activity restored, even if this node doesn't have activity, other node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
// Common state is not sampled on each trigger. We need to wait a little to get notified about the update.
TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
runNext(runner);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@ -892,7 +1077,7 @@ public class TestMonitorActivity {
// Activity restored, even if this node doesn't have activity, other node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
@ -906,4 +1091,288 @@ public class TestMonitorActivity {
}
}
@Test
public void testDisconnectedNodeActivatesFlow() {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Disconnect the node, and feed in a flow file
runner.setConnected(false);
runner.enqueue("Incoming data");
runNext(runner);
// We expect both activation marker, and forwarded FF
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
}
@Test
public void testDisconnectedNodeDeactivatesFlowOnlyWhenConnected() {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(false);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.clearTransferState();
// Disconnect the node, and expect marker
runner.setConnected(true);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
}
@Test
public void testLocalStateIsNotDeletedInStandaloneCaseWhenStopped() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
// Stop the processor and expect the local state still there
runner.stop();
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
}
@Test
public void testLocalStateIsNotDeletedInClusteredCaseNodeScopeWhenStopped() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
// Stop the processor and expect the local state still there
runner.stop();
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
}
@Test
public void testLocalStateIsNotDeletedInClusteredCaseClusterScopeWhenStopped() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
// Stop the processor and expect the local state still there
runner.stop();
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
}
@Test
public void testLocalStateIsNotDeletedInClusteredCaseWhenDisconnectedAndStopped() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
// Disconnect the node & stop the processor and expect the local state still there
runner.setConnected(false);
runner.stop();
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
}
@Test
public void testActivationMarkerIsImmediateWhenAnyOtherNodeActivatesTheFlow() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.DAYS.toMillis(1)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Update the cluster state
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(currentTimeMillis())), Scope.CLUSTER);
runNext(runner);
// We expect activation marker only
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_ACTIVITY_RESTORED);
}
@Test
public void testDisconnectNodeAndActivateBothTheOtherNodesAndTheDisconnectedNodeIndependently() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.DAYS.toMillis(1)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Disconnect the node, and feed in a flow file to activate it
runner.setConnected(false);
runner.enqueue("Incoming data");
runNext(runner);
// We expect both activation marker, and forwarded FF
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
runner.clearTransferState();
// Update the cluster state too, and reconnect. This simulates other nodes being activated.
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(currentTimeMillis())), Scope.CLUSTER);
runner.setConnected(true);
runNext(runner);
// We expect no output
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
public void testClusterStateIsImmediatelyUpdatedOnActivation() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.DAYS.toMillis(1)));
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
assertNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.enqueue("Incoming data");
runNext(runner);
assertNotNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
@Test
public void testResetStateOnStartupByDefault() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity());
runner.setIsConfiguredForClustering(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
runner.getStateManager().setState(
singletonMap(
MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis() - TimeUnit.DAYS.toMillis(1))
),
Scope.LOCAL
);
runner.enqueue("Incoming data");
runner.run();
// We expect only the FF as output
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
}
@Test
public void testResetStateOnStartupDisabled() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity());
runner.setIsConfiguredForClustering(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
runner.getStateManager().setState(
singletonMap(
MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis() - TimeUnit.DAYS.toMillis(1))
),
Scope.LOCAL
);
runner.enqueue("Incoming data");
runner.run();
// We expect only the FF as output
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
}
@Test
public void testMultipleFlowFilesActivateTheFlowInSingleTriggerResultsInSingleMarker() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.DAYS.toMillis(1)));
runner.setIsConfiguredForClustering(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
// Becomes inactive
runner.run(1, false);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Adding flow files
runner.enqueue("Incoming data 1");
runner.enqueue("Incoming data 2");
runner.enqueue("Incoming data 3");
runNext(runner);
// We expect only the FF as output
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
}
}