NIFI-259: Update GetHBase to use new State Management feature; updated docs; bug fixes

This commit is contained in:
Mark Payne 2016-01-13 13:30:05 -05:00
parent bbce596d74
commit d39067ede6
2 changed files with 54 additions and 6 deletions

View File

@ -363,11 +363,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return;
}
int listCount = 0;
Long latestListingTimestamp = null;
final List<T> newEntries = new ArrayList<>();
for (final T entity : entityList) {
final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp
|| (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
final boolean newTimestamp = minTimestamp == null || entity.getTimestamp() > minTimestamp;
final boolean newEntryForTimestamp = minTimestamp != null && entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier());
final boolean list = newTimestamp || newEntryForTimestamp;
// Create the FlowFile for this path.
if (list) {
@ -375,7 +376,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
listCount++;
// If we don't have a new timestamp but just have a new entry, we need to
// add all of the previous entries to our entityList. If we have a new timestamp,
// then the previous entries can go away.
if (!newTimestamp) {
newEntries.addAll(entityList);
}
newEntries.add(entity);
if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) {
latestListingTimestamp = entity.getTimestamp();
@ -383,6 +391,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
final int listCount = newEntries.size();
if (listCount > 0) {
getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount});
session.commit();
@ -395,9 +404,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
final Set<String> identifiers = new HashSet<>(entityList.size());
final Set<String> identifiers = new HashSet<>(newEntries.size());
try {
for (final T entity : entityList) {
for (final T entity : newEntries) {
identifiers.add(entity.getIdentifier());
}
persist(latestListingTimestamp, identifiers, context.getStateManager());

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@ -30,6 +31,7 @@ import java.util.UUID;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -154,6 +156,43 @@ public class TestAbstractListProcessor {
assertEquals(1, cache.fetchCount);
}
@Test
public void testOnlyNewStateStored() throws IOException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", 1492L);
proc.addEntity("name", "id2", 1492L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(1, stateMap.getVersion());
final Map<String, String> map = stateMap.toMap();
assertEquals(3, map.size());
assertEquals("1492", map.get("timestamp"));
assertTrue(map.containsKey("id.1"));
assertTrue(map.containsKey("id.2"));
proc.addEntity("new name", "new id", 1493L);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
final StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(2, updatedStateMap.getVersion());
final Map<String, String> updatedValues = updatedStateMap.toMap();
assertEquals(2, updatedValues.size());
assertEquals("1493", updatedValues.get("timestamp"));
assertEquals("new id", updatedValues.get("id.1"));
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
private final Map<Object, Object> stored = new HashMap<>();
private int fetchCount = 0;