NIFI-8136: Added getState/setState/replaceState/clearState methods to ProcessSession, updated processors to use these methods instead of StateManager version where appropriate

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4757
This commit is contained in:
Mark Payne 2021-01-13 08:40:10 -05:00 committed by Matthew Burgess
parent b9076ca26e
commit 097edf4f7c
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
40 changed files with 986 additions and 525 deletions

View File

@ -16,14 +16,8 @@
*/
package org.apache.nifi.processor;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.FlowFileAccessException;
@ -35,6 +29,16 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
/**
* <p>
* A process session encompasses all the behaviors a processor can perform to
@ -868,4 +872,55 @@ public interface ProcessSession {
* @return the provenance reporter
*/
ProvenanceReporter getProvenanceReporter();
/**
* Updates the value of the component's state, setting it to given value. This method does not push the new value to the
* remote State Provider but rather caches the value until {@link #commit()} is called. At that point, it will publish the
* state to the remote State Provider, if the state is the latest according to the remote State Provider.
*
* @param state the value to change the state to
* @param scope the scope to use when storing the state
* @throws IOException if unable to communicate with the underlying storage mechanism
*/
void setState(Map<String, String> state, Scope scope) throws IOException;
/**
* Returns the current state for the component. This return value will never be <code>null</code>.
* If the state has not yet been set, the StateMap's version will be -1, and the map of values will be empty.
*
* @param scope the scope to use when fetching the state
* @return the current state for the component
* @throws IOException if unable to communicate with the underlying storage mechanism
*/
StateMap getState(Scope scope) throws IOException;
/**
* Updates the value of the component's state to the new value if and only if the value currently
* is the same as the given oldValue. The oldValue will be compared against the value of the state as it is
* known to the Process Session. If the Process Session does not currently know the state, it will be fetched
* from the StateProvider.
*
* The value will not be provided to any remote state provider until {@link #commit()} is called. At that point,
* if the value that has been set by this method is the most up-to-date value, according to the state provider,
* then the remote state provider will be updated to match the given <code>newValue</code>.
*
* @param oldValue the old value to compare against
* @param newValue the new value to use if and only if the state's value is the same as the given oldValue
* @param scope the scope to use for storing the new state
* @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not
* equal to oldValue
*
* @throws IOException if unable to communicate with the underlying storage mechanism
*/
boolean replaceState(StateMap oldValue, Map<String, String> newValue, Scope scope) throws IOException;
/**
* Clears all keys and values from the component's state when the session is committed
*
* @param scope the scope whose values should be cleared
*
* @throws IOException if unable to communicate with the underlying storage mechanism.
*/
void clearState(Scope scope) throws IOException;
}

View File

@ -17,16 +17,17 @@
package org.apache.nifi.state;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.junit.Assert;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class MockStateManager implements StateManager {
private final AtomicInteger versionIndex = new AtomicInteger(0);
@ -38,6 +39,10 @@ public class MockStateManager implements StateManager {
private volatile boolean failToSetLocalState = false;
private volatile boolean failToGetClusterState = false;
private volatile boolean failToSetClusterState = false;
private volatile boolean ignoreAnnotations = false;
private final AtomicLong localRetrievedCount = new AtomicLong(0L);
private final AtomicLong clusterRetrievedCount = new AtomicLong(0L);
private final boolean usesLocalState;
private final boolean usesClusterState;
@ -65,6 +70,11 @@ public class MockStateManager implements StateManager {
}
}
public void reset() {
clusterStateMap = new MockStateMap(null, -1L);
localStateMap = new MockStateMap(null, -1L);
}
@Override
public synchronized void setState(final Map<String, String> state, final Scope scope) throws IOException {
verifyAnnotation(scope);
@ -88,12 +98,25 @@ public class MockStateManager implements StateManager {
private synchronized StateMap retrieveState(final Scope scope) {
verifyAnnotation(scope);
if (scope == Scope.CLUSTER) {
clusterRetrievedCount.incrementAndGet();
return clusterStateMap;
} else {
localRetrievedCount.incrementAndGet();
return localStateMap;
}
}
public long getRetrievalCount(final Scope scope) {
switch (scope) {
case CLUSTER:
return clusterRetrievedCount.get();
case LOCAL:
return localRetrievedCount.get();
default:
throw new IllegalArgumentException("Invalid scope: " + scope);
}
}
@Override
public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
verifyAnnotation(scope);
@ -136,7 +159,15 @@ public class MockStateManager implements StateManager {
}
}
public void setIgnoreAnnotations(final boolean ignore) {
this.ignoreAnnotations = ignore;
}
private void verifyAnnotation(final Scope scope) {
if (ignoreAnnotations) {
return;
}
// ensure that the @Stateful annotation is present with the appropriate Scope
if ((scope == Scope.LOCAL && !usesLocalState) || (scope == Scope.CLUSTER && !usesClusterState)) {
Assert.fail("Component is attempting to set or retrieve state with a scope of " + scope + " but does not declare that it will use "
@ -232,6 +263,14 @@ public class MockStateManager implements StateManager {
Assert.assertNotSame("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion());
}
/**
* Ensures that state was not set for any scope
*/
public void assertStateNotSet() {
assertStateNotSet(Scope.CLUSTER);
assertStateNotSet(Scope.LOCAL);
}
/**
* Ensures that the state was not set for the given scope
*

View File

@ -16,6 +16,26 @@
*/
package org.apache.nifi.util;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.state.MockStateManager;
import org.junit.Assert;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
@ -42,21 +62,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Assert;
public class MockProcessSession implements ProcessSession {
@ -79,6 +84,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
// A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed.
private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>();
private final StateManager stateManager;
private boolean committed = false;
private boolean rolledback = false;
@ -87,15 +93,20 @@ public class MockProcessSession implements ProcessSession {
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
this(sharedState, processor, true);
this(sharedState, processor, true, new MockStateManager(processor));
}
public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed) {
public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final StateManager stateManager) {
this(sharedState, processor, true, stateManager);
}
public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed, final StateManager stateManager) {
this.processor = processor;
this.enforceStreamsClosed = enforceStreamsClosed;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
this.provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
this.stateManager = stateManager;
}
@Override
@ -1315,6 +1326,26 @@ public class MockProcessSession implements ProcessSession {
return provenanceReporter;
}
@Override
public void setState(final Map<String, String> state, final Scope scope) throws IOException {
stateManager.setState(state, scope);
}
@Override
public StateMap getState(final Scope scope) throws IOException {
return stateManager.getState(scope);
}
@Override
public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
return stateManager.replace(oldValue, newValue, scope);
}
@Override
public void clearState(final Scope scope) throws IOException {
stateManager.clear(scope);
}
@Override
public MockFlowFile penalize(FlowFile flowFile) {
flowFile = validateState(flowFile);

View File

@ -16,29 +16,33 @@
*/
package org.apache.nifi.util;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class MockSessionFactory implements ProcessSessionFactory {
private final Processor processor;
private final SharedSessionState sharedState;
private final Set<MockProcessSession> createdSessions = new CopyOnWriteArraySet<>();
private final boolean enforceReadStreamsClosed;
private final StateManager stateManager;
MockSessionFactory(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) {
MockSessionFactory(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed, final StateManager stateManager) {
this.sharedState = sharedState;
this.processor = processor;
this.enforceReadStreamsClosed = enforceReadStreamsClosed;
this.stateManager = stateManager;
}
@Override
public ProcessSession createSession() {
final MockProcessSession session = new MockProcessSession(sharedState, processor, enforceReadStreamsClosed);
final MockProcessSession session = new MockProcessSession(sharedState, processor, enforceReadStreamsClosed, stateManager);
createdSessions.add(session);
return session;
}

View File

@ -16,33 +16,6 @@
*/
package org.apache.nifi.util;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -73,6 +46,34 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.junit.Assert;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import static java.util.Objects.requireNonNull;
public class StandardProcessorTestRunner implements TestRunner {
private final Processor processor;
@ -116,8 +117,8 @@ public class StandardProcessorTestRunner implements TestRunner {
this.idGenerator = new AtomicLong(0L);
this.sharedState = new SharedSessionState(processor, idGenerator);
this.flowFileQueue = sharedState.getFlowFileQueue();
this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed);
this.processorStateManager = new MockStateManager(processor);
this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed, processorStateManager);
this.variableRegistry = new MockVariableRegistry();
this.context = new MockProcessContext(processor, processorName, processorStateManager, variableRegistry);
@ -141,7 +142,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void enforceReadStreamsClosed(final boolean enforce) {
enforceReadStreamsClosed = enforce;
this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed);
this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed, processorStateManager);
}
@Override
@ -439,7 +440,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public MockFlowFile enqueue(final InputStream data, final Map<String, String> attributes) {
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor, enforceReadStreamsClosed);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor, enforceReadStreamsClosed, processorStateManager);
MockFlowFile flowFile = session.create();
flowFile = session.importFrom(data, flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -33,16 +24,27 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestMockProcessSession {
@Test
public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile flowFile = session.createFlowFile("hello, world".getBytes());
final InputStream in = session.read(flowFile);
final byte[] buffer = new byte[12];
@ -61,7 +63,7 @@ public class TestMockProcessSession {
@Test
public void testTransferUnknownRelationship() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
final Relationship fakeRel = new Relationship.Builder().name("FAKE").build();
try {
@ -82,7 +84,7 @@ public class TestMockProcessSession {
@Test(expected = IllegalArgumentException.class)
public void testRejectTransferNewlyCreatedFileToSelf() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
// this should throw an exception because we shouldn't allow a newly created flowfile to get routed back to self
session.transfer(ff1);
@ -91,7 +93,7 @@ public class TestMockProcessSession {
@Test
public void testKeepPenalizedStatusAfterPuttingAttribute(){
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
ff1 = session.penalize(ff1);
assertTrue(ff1.isPenalized());
@ -103,7 +105,7 @@ public class TestMockProcessSession {
@Test
public void testUnpenalizeFlowFile() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor));
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
ff1 = session.penalize(ff1);
assertTrue(ff1.isPenalized());

View File

@ -16,19 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.internal.Constants;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@ -70,8 +58,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@ -83,6 +69,20 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@PrimaryNodeOnly
@TriggerSerially
@ -251,8 +251,8 @@ public class ListS3 extends AbstractS3Processor {
public static final String CURRENT_KEY_PREFIX = "key-";
// State tracking
private long currentTimestamp = 0L;
private Set<String> currentKeys;
private volatile long currentTimestamp = 0L;
private volatile Set<String> currentKeys;
private static Validator createRequesterPaysValidator() {
return new Validator() {
@ -291,8 +291,8 @@ public class ListS3 extends AbstractS3Processor {
return keys;
}
private void restoreState(final ProcessContext context) throws IOException {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
private void restoreState(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
currentTimestamp = 0L;
currentKeys = new HashSet<>();
@ -302,16 +302,18 @@ public class ListS3 extends AbstractS3Processor {
}
}
private void persistState(final ProcessContext context) {
Map<String, String> state = new HashMap<>();
state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
private void persistState(final ProcessSession session, final long timestamp, final Collection<String> keys) {
final Map<String, String> state = new HashMap<>();
state.put(CURRENT_TIMESTAMP, String.valueOf(timestamp));
int i = 0;
for (String key : currentKeys) {
state.put(CURRENT_KEY_PREFIX+i, key);
for (final String key : keys) {
state.put(CURRENT_KEY_PREFIX + i, key);
i++;
}
try {
context.getStateManager().setState(state, Scope.CLUSTER);
session.setState(state, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
}
@ -320,7 +322,7 @@ public class ListS3 extends AbstractS3Processor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
try {
restoreState(context);
restoreState(session);
} catch (IOException ioe) {
getLogger().error("Failed to restore processor state; yielding", ioe);
context.yield();
@ -445,17 +447,18 @@ public class ListS3 extends AbstractS3Processor {
return;
}
final Set<String> updatedKeys = new HashSet<>();
if (latestListedTimestampInThisCycle <= currentTimestamp) {
updatedKeys.addAll(currentKeys);
}
updatedKeys.addAll(listedKeys);
persistState(session, latestListedTimestampInThisCycle, updatedKeys);
session.commit();
// Update currentKeys.
if (latestListedTimestampInThisCycle > currentTimestamp) {
currentKeys.clear();
}
currentKeys.addAll(listedKeys);
// Update stateManger with the most recent timestamp
currentKeys = updatedKeys;
currentTimestamp = latestListedTimestampInThisCycle;
persistState(context);
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.cdc.mysql.processors;
import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.FORMAT_DESCRIPTION;
import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.GtidSet;
import com.github.shyiko.mysql.binlog.event.Event;
@ -34,36 +25,9 @@ import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -72,6 +36,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.cdc.CDCException;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.RowEventException;
@ -114,11 +80,50 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.FORMAT_DESCRIPTION;
import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
/**
* A processor to retrieve Change Data Capture (CDC) events and send them as flow files.
*/
@TriggerSerially
@PrimaryNodeOnly
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "jdbc", "cdc", "mysql"})
@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
@ -291,7 +296,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder()
.name("capture-change-mysql-state-update-interval")
.displayName("State Update Interval")
.description("Indicates how often to update the processor's state with binlog file/position values. A value of zero means that state will only be updated when the processor is "
.description("DEPRECATED. This property is no longer used and exists solely for backward compatibility purposes. Indicates how often to update the processor's state with binlog "
+ "file/position values. A value of zero means that state will only be updated when the processor is "
+ "stopped or shutdown. If at some point the processor state does not contain the desired binlog values, the last flow file emitted will contain the last observed values, "
+ "and the processor can be returned to that state by using the Initial Binlog File, Initial Binlog Position, and Initial Sequence ID properties.")
.defaultValue("0 seconds")
@ -397,8 +403,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private int currentHost = 0;
private String transitUri = "<unknown>";
private volatile long lastStateUpdate = 0L;
private volatile long stateUpdateInterval = -1L;
private final AtomicLong currentSequenceId = new AtomicLong(0);
private volatile DistributedMapCacheClient cacheClient = null;
@ -455,6 +459,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return propDescriptors;
}
@OnPrimaryNodeStateChange
public synchronized void onPrimaryNodeChange(final PrimaryNodeState state) throws CDCException {
if (state == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
stop();
}
}
public void setup(ProcessContext context) {
final ComponentLog logger = getLogger();
@ -477,8 +488,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
PropertyValue tableNameValue = context.getProperty(TABLE_NAME_PATTERN);
tableNamePattern = tableNameValue.isSet() ? Pattern.compile(tableNameValue.getValue()) : null;
stateUpdateInterval = context.getProperty(STATE_UPDATE_INTERVAL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
@ -579,12 +588,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// Indicate that this processor has executed at least once, so we know whether or not the state values are valid and should be updated
hasRun.set(true);
ComponentLog log = getLogger();
StateManager stateManager = context.getStateManager();
// Create a client if we don't have one
if (binlogClient == null) {
@ -599,7 +607,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Communications failure, disconnect and try next time
log.error("Binlog connector communications failure: " + e.getMessage(), e);
try {
stop(stateManager);
stop();
} catch (CDCException ioe) {
throw new ProcessException(ioe);
}
@ -615,14 +623,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
try {
outputEvents(currentSession, stateManager, log);
long now = System.currentTimeMillis();
long timeSinceLastUpdate = now - lastStateUpdate;
if (stateUpdateInterval != 0 && timeSinceLastUpdate >= stateUpdateInterval) {
updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
lastStateUpdate = now;
}
outputEvents(currentSession, log);
} catch (IOException ioe) {
try {
// Perform some processor-level "rollback", then rollback the session
@ -631,7 +632,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentSequenceId.set(xactSequenceId);
currentGtidSet = xactGtidSet;
inTransaction = false;
stop(stateManager);
stop();
queue.clear();
currentSession.rollback();
} catch (Exception e) {
@ -643,19 +644,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
@OnStopped
@OnShutdown
public void onStopped(ProcessContext context) {
try {
stop(context.getStateManager());
} catch (CDCException ioe) {
throw new ProcessException(ioe);
}
}
@OnShutdown
public void onShutdown(ProcessContext context) {
try {
// In case we get shutdown while still running, save off the current state, disconnect, and shut down gracefully
stop(context.getStateManager());
stop();
} catch (CDCException ioe) {
throw new ProcessException(ioe);
}
@ -777,7 +769,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
public void outputEvents(ProcessSession session, StateManager stateManager, ComponentLog log) throws IOException {
public void outputEvents(ProcessSession session, ComponentLog log) throws IOException {
RawBinlogEvent rawBinlogEvent;
// Drain the queue
@ -869,11 +861,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
updateState(session);
// Commit the NiFi session
session.commit();
inTransaction = false;
currentTable = null;
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " ");
@ -899,6 +893,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !inTransaction) {
updateState(session);
session.commit();
}
}
@ -917,6 +912,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
updateState(session);
session.commit();
inTransaction = false;
currentTable = null;
@ -980,6 +976,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentBinlogFile = rotateEventData.getBinlogFilename();
currentBinlogPosition = rotateEventData.getBinlogPosition();
}
updateState(session);
break;
case GTID:
@ -988,6 +985,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
GtidEventData gtidEventData = event.getData();
gtidSet.add(gtidEventData.getGtid());
currentGtidSet = gtidSet.toString();
updateState(session);
}
break;
@ -1004,7 +1002,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
protected void stop(StateManager stateManager) throws CDCException {
protected void clearState() throws IOException {
if (currentSession == null) {
throw new IllegalStateException("No current session");
}
currentSession.clearState(Scope.CLUSTER);
}
protected void stop() throws CDCException {
try {
if (binlogClient != null) {
binlogClient.disconnect();
@ -1015,13 +1021,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
binlogClient.unregisterEventListener(eventListener);
}
}
doStop.set(true);
if (hasRun.getAndSet(false)) {
updateState(stateManager, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
if (currentSession != null) {
currentSession.commit();
}
currentBinlogPosition = -1;
doStop.set(true);
currentBinlogPosition = -1;
} catch (IOException e) {
throw new CDCException("Error closing CDC connection", e);
} finally {
@ -1033,25 +1039,27 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
private void updateState(StateManager stateManager, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
private void updateState(ProcessSession session) throws IOException {
updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet);
}
private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet) throws IOException {
// Update state with latest values
if (stateManager != null) {
Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
final Map<String, String> newStateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
// Save current binlog filename, position and GTID to the state map
if (binlogFile != null) {
newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
}
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
if (gtidSet != null) {
newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
}
stateManager.setState(newStateMap, Scope.CLUSTER);
// Save current binlog filename, position and GTID to the state map
if (binlogFile != null) {
newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
}
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
if (gtidSet != null) {
newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
}
session.setState(newStateMap, Scope.CLUSTER);
}

View File

@ -824,12 +824,6 @@ class CaptureChangeMySQLTest {
testRunner.run(1, false, false)
// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
// Stop the processor and verify the state is set
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
@ -857,7 +851,7 @@ class CaptureChangeMySQLTest {
testRunner.run(1, false, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
// COMMIT
@ -902,20 +896,15 @@ class CaptureChangeMySQLTest {
{} as EventData
))
testRunner.run(1, false, false)
// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER)
testRunner.run(1, true, false)
// Stop the processor and verify the state is set
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '-1000', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
testRunner.stateManager.clear(Scope.CLUSTER)
((CaptureChangeMySQL) testRunner.getProcessor()).clearState();
testRunner.stateManager.clear(Scope.CLUSTER);
// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 second')

View File

@ -215,7 +215,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
throws ProcessException {
try {
if (isQueryFinished(context.getStateManager())) {
if (isQueryFinished(session)) {
getLogger().trace(
"Query has been marked finished in the state manager. "
+ "To run another query, clear the state.");
@ -252,7 +252,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
final ComponentLog logger = getLogger();
try {
String scrollId = loadScrollId(context.getStateManager());
String scrollId = loadScrollId(session);
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions()
@ -347,7 +347,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
});
session.transfer(flowFile, REL_SUCCESS);
saveScrollId(context.getStateManager(), scrollId);
session.setState(Collections.singletonMap(SCROLL_ID_STATE, scrollId), Scope.LOCAL);
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
@ -367,8 +367,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
}
}
private boolean isQueryFinished(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
private boolean isQueryFinished(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.LOCAL);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");
@ -381,8 +381,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
return "true".equals(isQueryFinished);
}
private String loadScrollId(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
private String loadScrollId(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.LOCAL);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");
@ -404,15 +404,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
stateManager.setState(state, Scope.LOCAL);
}
private void saveScrollId(StateManager stateManager, String scrollId) throws IOException {
Map<String, String> state = new HashMap<>(2);
state.put(SCROLL_ID_STATE, scrollId);
getLogger().debug("Saving state with scrollId of {}", new Object[] { scrollId });
stateManager.setState(state, Scope.LOCAL);
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {

View File

@ -395,21 +395,32 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (minTimestamp != null) {
persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
final Map<String, String> updatedState = createStateMap(minTimestamp, minTimestamp, latestIdentifiersProcessed);
stateManager.setState(updatedState, scope);
}
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
private Map<String, String> createStateMap(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
final StateManager stateManager, final Scope scope) throws IOException {
final List<String> processedIdentifiesWithLatestTimestamp) throws IOException {
final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
}
stateManager.setState(updatedState, scope);
return updatedState;
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
final ProcessSession session, final Scope scope) throws IOException {
final Map<String, String> updatedState = createStateMap(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, processedIdentifiesWithLatestTimestamp);
session.setState(updatedState, scope);
}
protected String getKey(final String directory) {
@ -444,7 +455,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
boolean noUpdateRequired = false;
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final StateMap stateMap = session.getState(getStateScope(context));
latestIdentifiersProcessed.clear();
for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
@ -587,6 +598,25 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
if (latestListedEntryTimestampThisCycleMillis != null) {
boolean processedNewFiles = entitiesListed > 0;
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice.
// However, we want to save the state both locally and remotely.
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
// previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
try {
lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}
if (processedNewFiles) {
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
@ -604,25 +634,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
lastRunTimeNanos = currentRunTimeNanos;
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice.
// However, we want to save the state both locally and remotely.
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
// previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
try {
lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@ -45,10 +46,12 @@ public abstract class AbstractRepositoryContext implements RepositoryContext {
private final CounterRepository counterRepo;
private final ProvenanceEventRepository provenanceRepo;
private final AtomicLong connectionIndex;
private final StateManager stateManager;
public AbstractRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository,
final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository,
final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) {
final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) {
this.connectable = connectable;
contentRepo = contentRepository;
flowFileRepo = flowFileRepository;
@ -57,6 +60,7 @@ public abstract class AbstractRepositoryContext implements RepositoryContext {
provenanceRepo = provenanceRepository;
this.connectionIndex = connectionIndex;
this.stateManager = stateManager;
}
@Override
@ -251,4 +255,9 @@ public abstract class AbstractRepositoryContext implements RepositoryContext {
final String componentType = getProvenanceComponentDescription();
return new StandardProvenanceReporter(flowfileKnownCheck, getConnectable().getIdentifier(), componentType, getProvenanceRepository(), eventEnricher);
}
@Override
public StateManager getStateManager() {
return stateManager;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
@ -60,4 +61,6 @@ public interface RepositoryContext {
void adjustCounter(String name, long delta);
ProvenanceEventBuilder createProvenanceEventBuilder();
StateManager getStateManager();
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
@ -34,6 +37,7 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
@ -104,6 +108,7 @@ import java.util.stream.Collectors;
public class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
private static final StateMap EMPTY_STATE_MAP = new StandardStateMap(Collections.emptyMap(), -1L);
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
@ -161,6 +166,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private Checkpoint checkpoint = null;
private final ContentClaimWriteCache claimCache;
private StateMap localState;
private StateMap clusterState;
public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) {
this.context = context;
this.taskTermination = taskTermination;
@ -442,8 +450,44 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
LOG.debug(timingInfo.toString());
}
// Update local state
final StateManager stateManager = context.getStateManager();
if (checkpoint.localState != null) {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
if (stateMap.getVersion() < checkpoint.localState.getVersion()) {
LOG.debug("Updating State Manager's Local State");
try {
stateManager.setState(checkpoint.localState.toMap(), Scope.LOCAL);
} catch (final Exception e) {
LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
} else {
LOG.debug("Will not update State Manager's Local State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateMap.getVersion(), checkpoint.localState.getVersion());
}
}
// Update cluster state
if (checkpoint.clusterState != null) {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
if (stateMap.getVersion() < checkpoint.clusterState.getVersion()) {
LOG.debug("Updating State Manager's Cluster State");
try {
stateManager.setState(checkpoint.clusterState.toMap(), Scope.CLUSTER);
} catch (final Exception e) {
LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
} else {
LOG.debug("Will not update State Manager's Cluster State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateMap.getVersion(), checkpoint.clusterState.getVersion());
}
}
} catch (final Exception e) {
LOG.error("Failed to commit session {}. Will roll back.", e, this);
LOG.error("Failed to commit session {}. Will roll back.", this, e);
try {
// if we fail to commit the session, we need to roll back
@ -462,6 +506,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
}
private void updateEventRepository(final Checkpoint checkpoint) {
try {
// update event repository
@ -920,6 +965,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
LOG.warn("{} Attempted to close Output Stream for {} due to session rollback but close failed", this, this.connectableDescription, e1);
}
if (localState != null || clusterState != null) {
LOG.debug("Rolling back session that has state stored. This state will not be updated.");
}
if (rollbackCheckpoint && checkpoint != null && (checkpoint.localState != null || checkpoint.clusterState != null)) {
LOG.debug("Rolling back checkpoint that has state stored. This state will not be updated.");
}
// Gather all of the StandardRepositoryRecords that we need to operate on.
// If we are rolling back the checkpoint, we must create a copy of the Collection so that we can merge the
// session's records with the checkpoint's. Otherwise, we can operate on the session's records directly.
@ -1132,6 +1184,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
forkEventBuilders.clear();
provenanceReporter.clear();
localState = null;
clusterState = null;
processingStartTime = System.nanoTime();
}
@ -3321,6 +3376,74 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
return provenanceReporter;
}
@Override
public void setState(final Map<String, String> state, final Scope scope) throws IOException {
final long currentVersion = getState(scope).getVersion();
final StateMap stateMap = new StandardStateMap(state, currentVersion + 1);
setState(stateMap, scope);
}
private void setState(final StateMap stateMap, final Scope scope) {
if (scope == Scope.LOCAL) {
localState = stateMap;
} else {
clusterState = stateMap;
}
}
@Override
public StateMap getState(final Scope scope) throws IOException {
if (scope == Scope.LOCAL) {
if (localState != null) {
return localState;
}
if (checkpoint != null && checkpoint.localState != null) {
return checkpoint.localState;
}
// If no state is held locally, get it from the State Manager.
return context.getStateManager().getState(scope);
}
if (clusterState != null) {
return clusterState;
}
if (checkpoint != null && checkpoint.clusterState != null) {
return checkpoint.clusterState;
}
return context.getStateManager().getState(scope);
}
@Override
public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
final StateMap current = getState(scope);
if (current.getVersion() == -1 && (oldValue == null || oldValue.getVersion() == -1)) {
final StateMap stateMap = new StandardStateMap(newValue, 1L);
setState(stateMap, scope);
return true;
}
if (oldValue == null) {
return false;
}
if (current.getVersion() == oldValue.getVersion() && current.toMap().equals(oldValue.toMap())) {
final StateMap stateMap = new StandardStateMap(newValue, current.getVersion() + 1);
setState(stateMap, scope);
return true;
}
return false;
}
@Override
public void clearState(final Scope scope) {
setState(EMPTY_STATE_MAP, scope);
}
@Override
public String toString() {
return "StandardProcessSession[id=" + sessionId + "]";
@ -3364,6 +3487,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private long bytesReceived = 0L, bytesSent = 0L;
private boolean initialized = false;
private StateMap localState;
private StateMap clusterState;
private void initializeForCopy() {
if (initialized) {
@ -3430,6 +3555,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
this.bytesReceived = session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched();
this.flowFilesSent = session.provenanceReporter.getFlowFilesSent();
this.bytesSent = session.provenanceReporter.getBytesSent();
if (session.localState != null) {
this.localState = session.localState;
}
if (session.clusterState != null) {
this.clusterState = session.clusterState;
}
}
/**
@ -3470,6 +3602,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
this.bytesReceived += session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched();
this.flowFilesSent += session.provenanceReporter.getFlowFilesSent();
this.bytesSent += session.provenanceReporter.getBytesSent();
if (session.localState != null) {
this.localState = session.localState;
}
if (session.clusterState != null) {
this.clusterState = session.clusterState;
}
}
private <K, V> void mergeMaps(final Map<K, V> destination, final Map<K, V> toMerge, final BiFunction<? super V, ? super V, ? extends V> merger) {

View File

@ -16,39 +16,6 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -227,6 +194,40 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
@ -522,7 +523,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository, parameterContextManager);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
@ -901,7 +902,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// Begin expiring FlowFiles that are old
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
// now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the

View File

@ -16,15 +16,8 @@
*/
package org.apache.nifi.controller.repository;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.FlowFileFilter;
@ -36,6 +29,16 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
public class BatchingSessionFactory implements ProcessSessionFactory {
private final HighThroughputSession highThroughputSession;
@ -252,6 +255,26 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
return session.getProvenanceReporter();
}
@Override
public void setState(final Map<String, String> state, final Scope scope) throws IOException {
session.setState(state, scope);
}
@Override
public StateMap getState(final Scope scope) throws IOException {
return session.getState(scope);
}
@Override
public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
return session.replaceState(oldValue, newValue, scope);
}
@Override
public void clearState(final Scope scope) {
session.clearState(scope);
}
@Override
public OutputStream write(final FlowFile source) {
return session.write(source);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.claim.StandardContentClaimWriteCache;
@ -26,8 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
public class StandardRepositoryContext extends AbstractRepositoryContext implements RepositoryContext {
public StandardRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository);
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager);
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
@ -34,20 +36,23 @@ public class RepositoryContextFactory {
private final FlowFileEventRepository flowFileEventRepo;
private final CounterRepository counterRepo;
private final ProvenanceRepository provenanceRepo;
private final StateManagerProvider stateManagerProvider;
public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository,
final ProvenanceRepository provenanceRepository) {
final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider) {
this.contentRepo = contentRepository;
this.flowFileRepo = flowFileRepository;
this.flowFileEventRepo = flowFileEventRepository;
this.counterRepo = counterRepository;
this.provenanceRepo = provenanceRepository;
this.stateManagerProvider = stateManagerProvider;
}
public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) {
return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager);
}
public ContentRepository getContentRepository() {

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@ -47,6 +49,7 @@ import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
@ -111,6 +114,7 @@ public class StandardProcessSessionIT {
private FlowFileQueue flowFileQueue;
private StandardRepositoryContext context;
private Connectable connectable;
private MockStateManager stateManager;
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
@ -198,7 +202,10 @@ public class StandardProcessSessionIT {
contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository(contentRepo);
context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo);
stateManager = new MockStateManager(connectable);
stateManager.setIgnoreAnnotations(true);
context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager);
session = new StandardProcessSession(context, () -> false);
}
@ -2359,6 +2366,144 @@ public class StandardProcessSessionIT {
flowFile = session.putAttribute(flowFile, "counter", "4");
}
@Test
public void testStateStoredOnCommit() throws IOException {
session.commit();
stateManager.assertStateNotSet();
session.setState(Collections.singletonMap("abc", "123"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.commit();
stateManager.assertStateEquals("abc", "123", Scope.LOCAL);
stateManager.assertStateNotSet(Scope.CLUSTER);
stateManager.reset();
session.setState(Collections.singletonMap("abc", "123"), Scope.CLUSTER);
stateManager.assertStateNotSet();
session.commit();
stateManager.assertStateEquals("abc", "123", Scope.CLUSTER);
stateManager.assertStateNotSet(Scope.LOCAL);
}
@Test
public void testStateStoreFailure() throws IOException {
stateManager.setFailOnStateSet(Scope.LOCAL, true);
session.setState(Collections.singletonMap("abc", "123"), Scope.LOCAL);
stateManager.assertStateNotSet();
// Should not throw exception
session.commit();
// No longer fail on state updates
stateManager.setFailOnStateSet(Scope.LOCAL, false);
session.commit();
stateManager.assertStateNotSet();
session.setState(Collections.singletonMap("abc", "123"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.commit();
stateManager.assertStateEquals("abc", "123", Scope.LOCAL);
stateManager.assertStateNotSet(Scope.CLUSTER);
}
@Test
public void testStateRetrievedHasVersion() throws IOException {
StateMap retrieved = session.getState(Scope.LOCAL);
assertNotNull(retrieved);
assertEquals(-1, retrieved.getVersion());
assertEquals(1, stateManager.getRetrievalCount(Scope.LOCAL));
assertEquals(0, stateManager.getRetrievalCount(Scope.CLUSTER));
session.setState(Collections.singletonMap("abc", "123"), Scope.LOCAL);
stateManager.assertStateNotSet();
retrieved = session.getState(Scope.LOCAL);
assertNotNull(retrieved);
assertEquals(0, retrieved.getVersion());
assertEquals(Collections.singletonMap("abc", "123"), retrieved.toMap());
session.setState(Collections.singletonMap("abc", "222"), Scope.LOCAL);
retrieved = session.getState(Scope.LOCAL);
assertNotNull(retrieved);
assertEquals(1, retrieved.getVersion());
session.commit();
stateManager.assertStateEquals("abc", "222", Scope.LOCAL);
stateManager.assertStateNotSet(Scope.CLUSTER);
retrieved = session.getState(Scope.LOCAL);
assertNotNull(retrieved);
assertEquals(1, retrieved.getVersion());
}
@Test
public void testRollbackDoesNotStoreState() throws IOException {
session.setState(Collections.singletonMap("abc", "1"), Scope.LOCAL);
session.rollback();
stateManager.assertStateNotSet();
session.commit();
stateManager.assertStateNotSet();
}
@Test
public void testCheckpointedStateStoredOnCommit() throws IOException {
session.setState(Collections.singletonMap("abc", "1"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.checkpoint();
session.commit();
stateManager.assertStateEquals("abc", "1", Scope.LOCAL);
}
@Test
public void testSessionStateStoredOverCheckpoint() throws IOException {
session.setState(Collections.singletonMap("abc", "1"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.checkpoint();
session.setState(Collections.singletonMap("abc", "2"), Scope.LOCAL);
session.commit();
stateManager.assertStateEquals("abc", "2", Scope.LOCAL);
}
@Test
public void testRollbackAfterCheckpointStoresState() throws IOException {
session.setState(Collections.singletonMap("abc", "1"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.checkpoint();
session.setState(Collections.singletonMap("abc", "2"), Scope.LOCAL);
session.rollback();
stateManager.assertStateNotSet();
session.commit();
stateManager.assertStateEquals("abc", "1", Scope.LOCAL);
}
@Test
public void testFullRollbackAFterCheckpointDoesNotStoreState() throws IOException {
session.setState(Collections.singletonMap("abc", "1"), Scope.LOCAL);
stateManager.assertStateNotSet();
session.checkpoint();
session.setState(Collections.singletonMap("abc", "2"), Scope.LOCAL);
session.rollback(true, true);
session.commit();
stateManager.assertStateNotSet();
}
private static class MockFlowFileRepository implements FlowFileRepository {

View File

@ -179,7 +179,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
flowFileRepo = new StatelessFlowFileRepository();
final CounterRepository counterRepo = new StandardCounterRepository();
final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
counterRepo, provenanceRepo, stateManagerProvider);
final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
repositoryContextFactory);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.stateless.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.AbstractRepositoryContext;
import org.apache.nifi.controller.repository.ContentRepository;
@ -33,8 +34,9 @@ public class StatelessRepositoryContext extends AbstractRepositoryContext implem
private final ContentClaimWriteCache writeCache;
public StatelessRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository);
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager);
writeCache = new StatelessContentClaimWriteCache(contentRepository);
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.stateless.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
@ -38,19 +40,23 @@ public class StatelessRepositoryContextFactory implements RepositoryContextFacto
private final FlowFileEventRepository flowFileEventRepository;
private final CounterRepository counterRepository;
private final ProvenanceEventRepository provenanceEventRepository;
private final StateManagerProvider stateManagerProvider;
public StatelessRepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository,
final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) {
final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository, final StateManagerProvider stateManagerProvider) {
this.contentRepository = contentRepository;
this.flowFileRepository = flowFileRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.counterRepository = counterRepository;
this.provenanceEventRepository = provenanceRepository;
this.stateManagerProvider = stateManagerProvider;
}
@Override
public RepositoryContext createRepositoryContext(final Connectable connectable) {
return new StatelessRepositoryContext(connectable, new AtomicLong(0L), contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceEventRepository);
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
return new StatelessRepositoryContext(connectable, new AtomicLong(0L), contentRepository, flowFileRepository,
flowFileEventRepository, counterRepository, provenanceEventRepository, stateManager);
}
@Override

View File

@ -228,8 +228,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.collect(Collectors.toSet());
}
void restoreState(final ProcessContext context) throws IOException {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
void restoreState(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
currentTimestamp = 0L;
currentKeys.clear();
@ -240,7 +240,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
}
void persistState(final ProcessContext context, final long timestamp, final Set<String> keys) {
void persistState(final ProcessSession session, final long timestamp, final Set<String> keys) {
final Map<String, String> state = new HashMap<>();
state.put(CURRENT_TIMESTAMP, String.valueOf(timestamp));
@ -251,7 +251,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
try {
context.getStateManager().setState(state, Scope.CLUSTER);
session.setState(state, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
}
@ -266,9 +266,9 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
restoreState(context);
restoreState(session);
} catch (IOException e) {
getLogger().error("Failed to restore processor state; yielding", e);
context.yield();
@ -292,7 +292,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
final Storage storage = getCloudService();
long maxTimestamp = 0L;
final Set<String> maxKeys = new HashSet<>();
final Set<String> keysMatchingTimestamp = new HashSet<>();
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@ -321,17 +321,17 @@ public class ListGCSBucket extends AbstractGCSProcessor {
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
maxKeys.clear();
keysMatchingTimestamp.clear();
}
if (lastModified == maxTimestamp) {
maxKeys.add(blob.getName());
keysMatchingTimestamp.add(blob.getName());
}
listCount++;
}
if (writer.isCheckpoint()) {
commit(session, listCount);
commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
listCount = 0;
}
@ -339,16 +339,12 @@ public class ListGCSBucket extends AbstractGCSProcessor {
} while (blobPage != null);
writer.finishListing();
commit(session, listCount);
if (maxTimestamp != 0) {
currentTimestamp = maxTimestamp;
currentKeys.clear();
currentKeys.addAll(maxKeys);
persistState(context, currentTimestamp, currentKeys);
} else {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
if (maxTimestamp == 0) {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
context.yield();
} else {
commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
}
} catch (final Exception e) {
getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
@ -362,8 +358,13 @@ public class ListGCSBucket extends AbstractGCSProcessor {
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
private void commit(final ProcessSession session, int listCount) {
private void commit(final ProcessSession session, final int listCount, final long timestamp, final Set<String> keysMatchingTimestamp) {
if (listCount > 0) {
currentTimestamp = timestamp;
currentKeys.clear();
currentKeys.addAll(keysMatchingTimestamp);
persistState(session, currentTimestamp, currentKeys);
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
session.commit();
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -133,7 +134,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("Cluster StateMap should be fresh (version -1L)", -1L, runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion());
assertTrue(processor.getStateKeys().isEmpty());
processor.restoreState(runner.getProcessContext());
processor.restoreState(runner.getProcessSessionFactory().createSession());
assertTrue(processor.getStateKeys().isEmpty());
assertEquals(0L, processor.getStateTimestamp());
@ -161,7 +162,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertTrue(processor.getStateKeys().isEmpty());
assertEquals(0L, processor.getStateTimestamp());
processor.restoreState(runner.getProcessContext());
processor.restoreState(runner.getProcessSessionFactory().createSession());
assertNotNull(processor.getStateKeys());
assertTrue(processor.getStateKeys().contains("test-key-0"));
@ -183,7 +184,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
);
final Set<String> keys = ImmutableSet.of("test-key-0", "test-key-1");
processor.persistState(runner.getProcessContext(), 4L, keys);
final ProcessSession session = runner.getProcessSessionFactory().createSession();
processor.persistState(session, 4L, keys);
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals("Cluster StateMap should have been written to", 1L, stateMap.getVersion());
@ -212,7 +214,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertTrue(runner.getLogger().getErrorMessages().isEmpty());
processor.persistState(runner.getProcessContext(), 4L, keys);
final ProcessSession session = runner.getProcessSessionFactory().createSession();
processor.persistState(session, 4L, keys);
// The method should have caught the error and reported it to the logger.
final List<LogMessage> logMessages = runner.getLogger().getErrorMessages();

View File

@ -16,11 +16,10 @@
*/
package org.apache.nifi.processors.groovyx.flow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.FlowFileFilter;
import groovy.lang.Closure;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import java.util.List;

View File

@ -16,17 +16,8 @@
*/
package org.apache.nifi.processors.groovyx.flow;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.FlowFileFilter;
@ -39,9 +30,20 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.groovyx.util.Throwables;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.groovyx.util.Throwables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
/**
* wrapped session that collects all created/modified files if created with special flag
@ -966,4 +968,23 @@ public abstract class ProcessSessionWrap implements ProcessSession {
return s.write(source);
}
@Override
public void setState(final Map<String, String> state, final Scope scope) throws IOException {
s.setState(state, scope);
}
@Override
public StateMap getState(final Scope scope) throws IOException {
return s.getState(scope);
}
@Override
public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
return s.replaceState(oldValue, newValue, scope);
}
@Override
public void clearState(final Scope scope) throws IOException {
s.clearState(scope);
}
}

View File

@ -406,7 +406,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
@ -494,7 +494,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
getLogger().debug("New state map: {}", new Object[] {updatedState});
try {
context.getStateManager().setState(updatedState, Scope.CLUSTER);
session.setState(updatedState, Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
}

View File

@ -35,7 +35,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -169,10 +168,8 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final StateManager stateManager = context.getStateManager();
try {
StateMap state = stateManager.getState(Scope.CLUSTER);
StateMap state = session.getState(Scope.CLUSTER);
String txIdAsString = state.get(LAST_TX_ID);
if (txIdAsString != null && !"".equals(txIdAsString)) {
@ -237,7 +234,7 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
"Please see javadoc for org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStream: {}", new Object[]{e});
}
updateClusterStateForTxId(stateManager);
updateClusterStateForTxId(session);
}
private EventBatch getEventBatch(DFSInotifyEventInputStream eventStream, long duration, TimeUnit timeUnit, int retries) throws IOException, InterruptedException, MissingEventsException {
@ -259,11 +256,11 @@ public class GetHDFSEvents extends AbstractHadoopProcessor {
}
}
private void updateClusterStateForTxId(StateManager stateManager) {
private void updateClusterStateForTxId(final ProcessSession session) {
try {
Map<String, String> newState = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
Map<String, String> newState = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
newState.put(LAST_TX_ID, String.valueOf(lastTxId));
stateManager.setState(newState, Scope.CLUSTER);
session.setState(newState, Scope.CLUSTER);
} catch (IOException e) {
getLogger().warn("Failed to update cluster state for last txId. It is possible data replication may occur.", e);
}

View File

@ -16,6 +16,41 @@
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@ -37,42 +72,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@ -211,7 +210,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final ScanResult scanResult = getState(client);
if (scanResult != null) {
storeState(scanResult, context.getStateManager());
context.getStateManager().setState(scanResult.toFlatMap(), Scope.CLUSTER);
}
clearState(client);
@ -260,7 +259,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
// if the table was changed then remove any previous state
if (previousTable != null && !tableName.equals(previousTable)) {
try {
context.getStateManager().clear(Scope.CLUSTER);
session.clearState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to clear Cluster State", ioe);
}
@ -271,7 +270,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
final RowSerializer serializer = new JsonRowSerializer(charset);
this.lastResult = getState(context.getStateManager());
this.lastResult = getState(session);
final long defaultMinTime = (initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis());
final long minTime = (lastResult == null ? defaultMinTime : lastResult.getTimestamp());
@ -377,10 +376,10 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
// Commit session before we replace the lastResult; if session commit fails, we want
// to pull these records again.
session.commit();
if (lastResult == null || scanResults.getTimestamp() > lastResult.getTimestamp()) {
session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
session.commit();
lastResult = scanResults;
} else if (scanResults.getTimestamp() == lastResult.getTimestamp()) {
final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells());
@ -400,12 +399,13 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
existing.addAll(entry.getValue());
}
}
final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
session.setState(scanResult.toFlatMap(), Scope.CLUSTER);
session.commit();
lastResult = scanResult;
}
// save state using the framework's state manager
storeState(lastResult, context.getStateManager());
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
@ -437,11 +437,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
return columns;
}
private void storeState(final ScanResult scanResult, final StateManager stateManager) throws IOException {
stateManager.setState(scanResult.toFlatMap(), Scope.CLUSTER);
}
private void clearState(final DistributedMapCacheClient client) {
final File localState = getStateFile();
if (localState.exists()) {
@ -458,8 +453,8 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
}
private ScanResult getState(final StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
private ScanResult getState(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (stateMap.getVersion() < 0) {
return null;
}

View File

@ -250,8 +250,9 @@ public class GetSolr extends SolrProcessor {
stateMapHasChanged.set(true);
}
if (stateMapHasChanged.get())
if (stateMapHasChanged.get()) {
context.getStateManager().setState(stateMap, Scope.CLUSTER);
}
id_field = null;
}
@ -302,7 +303,7 @@ public class GetSolr extends SolrProcessor {
final String dateField = context.getProperty(DATE_FIELD).getValue();
final Map<String,String> stateMap = new HashMap<String,String>();
stateMap.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap());
stateMap.putAll(session.getState(Scope.CLUSTER).toMap());
solrQuery.setQuery("*:*");
final String query = context.getProperty(SOLR_QUERY).getValue();
@ -409,13 +410,14 @@ public class GetSolr extends SolrProcessor {
}
continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));
}
context.getStateManager().setState(stateMap, Scope.CLUSTER);
} catch(SolrServerException | SchemaNotFoundException | IOException e){
session.setState(stateMap, Scope.CLUSTER);
} catch (final SolrServerException | SchemaNotFoundException | IOException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), e}, e);
throw new ProcessException(e);
} catch( final Throwable t){
} catch (final Throwable t) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), t}, t);

View File

@ -38,7 +38,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@ -396,7 +395,7 @@ public class GetSplunk extends AbstractProcessor {
} else {
try {
// not provided so we need to check the previous state
final TimeRange previousRange = loadState(context.getStateManager());
final TimeRange previousRange = loadState(session);
final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
@ -412,7 +411,7 @@ public class GetSplunk extends AbstractProcessor {
// if its the first time through don't actually run, just save the state to get the
// initial time saved and next execution will be the first real execution
if (latestTime.equals(earliestTime)) {
saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
saveState(session, new TimeRange(earliestTime, latestTime));
return;
}
@ -496,7 +495,7 @@ public class GetSplunk extends AbstractProcessor {
// only need to do this for the managed time strategies
if (!PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) {
try {
saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime));
saveState(session, new TimeRange(earliestTime, latestTime));
} catch (IOException e) {
getLogger().error("Unable to load data from State Manager due to {}", new Object[]{e.getMessage()}, e);
session.rollback();
@ -550,7 +549,7 @@ public class GetSplunk extends AbstractProcessor {
return Service.connect(serviceArgs);
}
private void saveState(StateManager stateManager, TimeRange timeRange) throws IOException {
private void saveState(final ProcessSession session, TimeRange timeRange) throws IOException {
final String earliest = StringUtils.isBlank(timeRange.getEarliestTime()) ? "" : timeRange.getEarliestTime();
final String latest = StringUtils.isBlank(timeRange.getLatestTime()) ? "" : timeRange.getLatestTime();
@ -559,11 +558,11 @@ public class GetSplunk extends AbstractProcessor {
state.put(LATEST_TIME_KEY, latest);
getLogger().debug("Saving state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest});
stateManager.setState(state, Scope.CLUSTER);
session.setState(state, Scope.CLUSTER);
}
private TimeRange loadState(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
private TimeRange loadState(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");

View File

@ -22,7 +22,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
@ -206,17 +205,16 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
SqlWriter sqlWriter = configureSqlWriter(session, context);
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMap = session.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
context.yield();
return;
}
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
@ -387,13 +385,14 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
}
context.yield();
} finally {
session.commit();
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
session.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
}
session.commit();
}
}

View File

@ -260,21 +260,18 @@ public class EnforceOrder extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final Integer batchCount = context.getProperty(BATCH_COUNT).asInteger();
List<FlowFile> flowFiles = session.get(batchCount);
final List<FlowFile> flowFiles = session.get(batchCount);
if (flowFiles == null || flowFiles.isEmpty()) {
return;
}
final StateMap stateMap;
try {
stateMap = context.getStateManager().getState(Scope.LOCAL);
stateMap = session.getState(Scope.LOCAL);
} catch (final IOException e) {
logger.error("Failed to retrieve state from StateManager due to {}" + e, e);
getLogger().error("Failed to retrieve state from StateManager due to {}" + e, e);
context.yield();
return;
}
@ -283,7 +280,7 @@ public class EnforceOrder extends AbstractProcessor {
oc.groupStates.putAll(stateMap.toMap());
for (FlowFile flowFile : flowFiles) {
for (final FlowFile flowFile : flowFiles) {
oc.setFlowFile(flowFile);
if (oc.flowFile == null) {
break;
@ -305,7 +302,7 @@ public class EnforceOrder extends AbstractProcessor {
oc.cleanupInactiveStates();
try {
context.getStateManager().setState(oc.groupStates, Scope.LOCAL);
session.setState(oc.groupStates, Scope.LOCAL);
} catch (final IOException e) {
throw new RuntimeException("Failed to update state due to " + e
+ ". Session will be rollback and processor will be yielded for a while.", e);

View File

@ -34,7 +34,6 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
@ -276,19 +275,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String customOrderByColumn = context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
FlowFile finalFileToProcess = fileToProcess;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMap = session.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
context.yield();
return;
}
try {
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
@ -549,14 +547,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
}
session.commit();
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
session.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.",
new Object[]{this, ioe});
+ "Also, any generated SQL statements may be duplicated.", this, ioe);
}
} catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available

View File

@ -16,33 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
@ -82,7 +55,6 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -103,6 +75,34 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
@Deprecated
@DeprecationNotice(alternatives = {InvokeHTTP.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@ -462,7 +462,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
final StateMap beforeStateMap;
try {
beforeStateMap = context.getStateManager().getState(Scope.LOCAL);
beforeStateMap = session.getState(Scope.LOCAL);
final String lastModified = beforeStateMap.get(LAST_MODIFIED + ":" + url);
if (lastModified != null) {
get.addHeader(HEADER_IF_MODIFIED_SINCE, parseStateValue(lastModified).getValue());
@ -536,7 +536,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
session.commit();
updateStateMap(context, response, beforeStateMap, url);
updateStateMap(context, session, response, beforeStateMap, url);
} catch (final IOException e) {
context.yield();
@ -557,11 +557,10 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
}
}
private void updateStateMap(ProcessContext context, HttpResponse response, StateMap beforeStateMap, String url) {
private void updateStateMap(final ProcessContext context, final ProcessSession session, HttpResponse response, StateMap beforeStateMap, String url) {
try {
Map<String, String> workingMap = new HashMap<>();
workingMap.putAll(beforeStateMap.toMap());
final StateManager stateManager = context.getStateManager();
StateMap oldValue = beforeStateMap;
long currentTime = System.currentTimeMillis();
@ -576,11 +575,11 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
workingMap.put(ETAG + ":" + url, currentTime + ":" + receivedEtag.getValue());
}
boolean replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
boolean replaceSucceeded = session.replaceState(oldValue, workingMap, Scope.LOCAL);
boolean changed;
while (!replaceSucceeded) {
oldValue = stateManager.getState(Scope.LOCAL);
oldValue = session.getState(Scope.LOCAL);
workingMap.clear();
workingMap.putAll(oldValue.toMap());
@ -605,7 +604,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
}
if (changed) {
replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
replaceSucceeded = session.replaceState(oldValue, workingMap, Scope.LOCAL);
} else {
break;
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
@ -227,7 +226,7 @@ public class ListDatabaseTables extends AbstractProcessor {
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String catalog = context.getProperty(CATALOG).getValue();
@ -239,11 +238,10 @@ public class ListDatabaseTables extends AbstractProcessor {
final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
final long refreshInterval = context.getProperty(REFRESH_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
final Map<String, String> stateMapProperties;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMap = session.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
} catch (IOException ioe) {
throw new ProcessException(ioe);
@ -345,13 +343,8 @@ public class ListDatabaseTables extends AbstractProcessor {
writer.finishListing();
}
// Update the timestamps for listed tables
if (stateMap.getVersion() == -1) {
stateManager.setState(stateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
}
session.replaceState(stateMap, stateMapProperties, Scope.CLUSTER);
} catch (final SQLException | IOException | SchemaNotFoundException e) {
writer.finishListingExceptionally(e);
session.rollback();

View File

@ -283,7 +283,7 @@ public class MonitorActivity extends AbstractProcessor {
// Even if this node has been inactive, there may be other nodes handling flow actively.
// However, if this node is active, we don't have to look at cluster state.
try {
clusterState = context.getStateManager().getState(Scope.CLUSTER);
clusterState = session.getState(Scope.CLUSTER);
if (clusterState != null && !StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
final long latestReportedClusterActivity = Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
isInactive = (now >= latestReportedClusterActivity + thresholdMillis);
@ -336,8 +336,7 @@ public class MonitorActivity extends AbstractProcessor {
&& (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
// We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
try {
final StateManager stateManager = context.getStateManager();
final StateMap state = stateManager.getState(Scope.CLUSTER);
final StateMap state = session.getState(Scope.CLUSTER);
final Map<String, String> newValues = new HashMap<>();
@ -348,14 +347,14 @@ public class MonitorActivity extends AbstractProcessor {
newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
if (state == null || state.getVersion() == -1) {
stateManager.setState(newValues, Scope.CLUSTER);
session.setState(newValues, Scope.CLUSTER);
} else {
final String existingTimestamp = state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
if (StringUtils.isEmpty(existingTimestamp)
|| Long.parseLong(existingTimestamp) < now) {
// If this returns false due to race condition, it's not a problem since we just need
// the latest active timestamp.
stateManager.replace(state, newValues, Scope.CLUSTER);
session.replaceState(state, newValues, Scope.CLUSTER);
} else {
logger.debug("Existing state has more recent timestamp, didn't update state.");
}

View File

@ -598,7 +598,7 @@ public class TailFile extends AbstractProcessor {
try {
final List<String> filesToTail = lookup(context);
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
final StateMap stateMap = session.getState(scope);
initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue());
} catch (IOException e) {
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
@ -769,7 +769,7 @@ public class TailFile extends AbstractProcessor {
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
getLogger().debug("No data to consume; created no FlowFiles");
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
persistState(tfo, context);
persistState(tfo, session, context);
context.yield();
return;
}
@ -854,9 +854,7 @@ public class TailFile extends AbstractProcessor {
// Create a new state object to represent our current position, timestamp, etc.
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
// We must commit session before persisting state in order to avoid data loss on restart
session.commit();
persistState(tfo, context);
persistState(tfo, session, context);
}
/**
@ -1036,13 +1034,13 @@ public class TailFile extends AbstractProcessor {
return Scope.LOCAL;
}
private void persistState(final TailFileObject tfo, final ProcessContext context) {
persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), context);
private void persistState(final TailFileObject tfo, final ProcessSession session, final ProcessContext context) {
persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), session, context);
}
private void persistState(final Map<String, String> state, final ProcessContext context) {
private void persistState(final Map<String, String> state, final ProcessSession session, final ProcessContext context) {
try {
StateMap oldState = context.getStateManager().getState(getStateScope(context));
final StateMap oldState = session.getState(getStateScope(context));
Map<String, String> updatedState = new HashMap<String, String>();
for(String key : oldState.toMap().keySet()) {
@ -1059,7 +1057,8 @@ public class TailFile extends AbstractProcessor {
}
updatedState.putAll(state);
context.getStateManager().setState(updatedState, getStateScope(context));
session.setState(updatedState, getStateScope(context));
} catch (final IOException e) {
getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", new Object[]{e});
}
@ -1207,9 +1206,7 @@ public class TailFile extends AbstractProcessor {
cleanup();
tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(tfo, context);
persistState(tfo, session, context);
}
} else {
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
@ -1267,9 +1264,7 @@ public class TailFile extends AbstractProcessor {
tfo.setState(new TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null,
tfo.getState().getBuffer()));
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(tfo, context);
persistState(tfo, session, context);
}
return tfo.getState();

View File

@ -185,11 +185,9 @@ public class AttributeRollingWindow extends AbstractProcessor {
}
private void noMicroBatch(ProcessContext context, ProcessSession session, FlowFile flowFile, Long currTime) {
final StateManager stateManager = context.getStateManager();
Map<String, String> state = null;
try {
state = new HashMap<>(stateManager.getState(SCOPE).toMap());
state = new HashMap<>(session.getState(SCOPE).toMap());
} catch (IOException e) {
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
session.transfer(flowFile);
@ -233,10 +231,10 @@ public class AttributeRollingWindow extends AbstractProcessor {
aggregateValue += currentFlowFileValue;
state.put(String.valueOf(currTime), String.valueOf(currentFlowFileValue));
state.put(COUNT_KEY, countString);
try {
stateManager.setState(state, SCOPE);
session.setState(state, SCOPE);
} catch (IOException e) {
getLogger().error("Failed to set the state after successfully processing {} due a failure when setting the state. Transferring to '{}'",
new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e);
@ -259,11 +257,9 @@ public class AttributeRollingWindow extends AbstractProcessor {
}
private void microBatch(ProcessContext context, ProcessSession session, FlowFile flowFile, Long currTime) {
final StateManager stateManager = context.getStateManager();
Map<String, String> state = null;
try {
state = new HashMap<>(stateManager.getState(SCOPE).toMap());
state = new HashMap<>(session.getState(SCOPE).toMap());
} catch (IOException e) {
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
session.transfer(flowFile);
@ -355,7 +351,7 @@ public class AttributeRollingWindow extends AbstractProcessor {
state.put(currBatchStart + COUNT_APPEND_KEY, String.valueOf(currentBatchCount));
try {
stateManager.setState(state, SCOPE);
session.setState(state, SCOPE);
} catch (IOException e) {
getLogger().error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{flowFile}, e);
session.transfer(flowFile);

View File

@ -457,7 +457,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
try {
if (stateful) {
stateMap = context.getStateManager().getState(Scope.LOCAL);
stateMap = session.getState(Scope.LOCAL);
stateInitialAttributes = stateMap.toMap();
stateWorkingAttributes = new HashMap<>(stateMap.toMap());
} else {
@ -513,7 +513,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
// Able to use "equals()" since we're just checking if the map was modified at all
if (!stateWorkingAttributes.equals(stateInitialAttributes)) {
boolean setState = context.getStateManager().replace(stateMap, stateWorkingAttributes, Scope.LOCAL);
boolean setState = session.replaceState(stateMap, stateWorkingAttributes, Scope.LOCAL);
if (!setState) {
logger.warn("Failed to update the state after successfully processing {} due to having an old version of the StateMap. This is normally due to multiple threads running at " +
"once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()});