From bbce596d741d8c660291bc8c788c44173c0bb39c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 13 Jan 2016 12:47:08 -0500 Subject: [PATCH] NIFI-259: Updated GetHBase to use new State Management; bug fixes; updated docs --- .../main/asciidoc/administration-guide.adoc | 22 +- .../zookeeper/ZooKeeperStateProvider.java | 6 +- .../nifi/processors/hadoop/ListHDFS.java | 6 +- .../java/org/apache/nifi/hbase/GetHBase.java | 221 ++++++++++++------ .../org/apache/nifi/hbase/TestGetHBase.java | 93 +++++--- .../standard/AbstractListProcessor.java | 2 +- 6 files changed, 248 insertions(+), 102 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index b24ec662ce..4f306866f3 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -443,7 +443,27 @@ in _nifi.properties_ also becomes relevant. This specifies the ZooKeeper propert with the list of ZooKeeper servers. Each of these servers is configured as :[:]. For example, `myhost:2888:3888`. This list of nodes should be the same nodes in the NiFi cluster that have the `nifi.state.management.embedded.zookeeper.start` property set to `true`. Also note that because ZooKeeper will be listening on these ports, the firewall may need to be configured to open these ports -for incoming traffic, at least between nodes in the cluster. +for incoming traffic, at least between nodes in the cluster. Additionally, the port to listen on for client connections must be opened in the firewall. +The default value for this is _2181_ but can be configured via the _clientPort_ property in the _zookeeper.properties_ file. + +When using an embedded ZooKeeper, the _conf/zookeeper.properties_ file has a property named `dataDir`. By default, this value is set to `./state/zookeeper`. +If more than one NiFi node is running an embedded ZooKeeper, it is important to tell the server which one it is. This is accomplished by creating a file named +_myid_ and placing it in ZooKeeper's data directory. The contents of this file should be index of the server. So for one of the ZooKeeper servers, we will +accomplish this by performing the following commands: + +[source] +cd $NIFI_HOME +mkdir state +mkdir state/zookeeper +echo 1 > state/zookeeper/myid + +For the next NiFi Node that will run ZooKeeper, we can accomplish this by performing the following commands: +cd $NIFI_HOME +mkdir state +mkdir state/zookeeper +echo 2 > state/zookeeper/myid + +And so on. For more information on the properties used to administer ZooKeeper, see the link:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html[ZooKeeper Admin Guide]. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index 0b66367c48..faa0364784 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -102,7 +102,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { .sensitive(true) .build(); - private static final int ENCODING_VERSION = 1; + private static final byte ENCODING_VERSION = 1; private ZooKeeper zooKeeper; @@ -251,7 +251,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { private byte[] serialize(final Map stateValues) throws IOException { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream dos = new DataOutputStream(baos)) { - dos.writeInt(ENCODING_VERSION); + dos.writeByte(ENCODING_VERSION); dos.writeInt(stateValues.size()); for (final Map.Entry entry : stateValues.entrySet()) { dos.writeUTF(entry.getKey()); @@ -265,7 +265,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { try (final ByteArrayInputStream bais = new ByteArrayInputStream(data); final DataInputStream dis = new DataInputStream(bais)) { - final int encodingVersion = dis.readInt(); + final byte encodingVersion = dis.readByte(); if (encodingVersion > ENCODING_VERSION) { throw new IOException("Retrieved a response from ZooKeeper when retrieving state for component with ID " + componentId + ", but the response was encoded using the ZooKeeperStateProvider Encoding Version of " + encodingVersion diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 3645eecc3a..d6c0c4e45c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -188,7 +188,7 @@ public class ListHDFS extends AbstractHadoopProcessor { if (stateMap.getVersion() == -1L) { final HDFSListing serviceListing = getListingFromService(context); if (serviceListing != null) { - persistState(serviceListing, context.getStateManager()); + context.getStateManager().setState(serviceListing.toMap(), Scope.CLUSTER); } } } @@ -213,10 +213,6 @@ public class ListHDFS extends AbstractHadoopProcessor { } } - private void persistState(final HDFSListing listing, final StateManager stateManager) throws IOException { - final Map stateValues = listing.toMap(); - stateManager.setState(stateValues, Scope.CLUSTER); - } private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException { // No cluster-wide state has been recovered. Just use whatever values we already have. diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 98a612cd31..65b261ab4b 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -16,8 +16,30 @@ */ package org.apache.nifi.hbase; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -32,6 +54,9 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.io.JsonRowSerializer; @@ -50,28 +75,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.ObjectHolder; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - @TriggerWhenEmpty @TriggerSerially @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -84,6 +87,9 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"), @WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON") }) +@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) " + + "and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is " + + "run on Primary Node only and the Primary Node changes.") public class GetHBase extends AbstractProcessor { static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*"); @@ -101,7 +107,7 @@ public class GetHBase extends AbstractProcessor { .name("Distributed Cache Service") .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" + " so that if a new node begins pulling data, it won't duplicate all of the work that has been done.") - .required(true) + .required(false) .identifiesControllerService(DistributedMapCacheClient.class) .build(); static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() @@ -197,7 +203,20 @@ public class GetHBase extends AbstractProcessor { } @OnScheduled - public void parseColumns(final ProcessContext context) { + public void parseColumns(final ProcessContext context) throws IOException { + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + if (stateMap.getVersion() < 0) { + // no state has been stored in the State Manager - check if we have state stored in the + // DistributedMapCacheClient service and migrate it if so + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + final ScanResult scanResult = getState(client); + if (scanResult != null) { + storeState(scanResult, context.getStateManager()); + } + + clearState(client); + } + final String columnsValue = context.getProperty(COLUMNS).getValue(); final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); @@ -225,7 +244,9 @@ public class GetHBase extends AbstractProcessor { @OnRemoved public void onRemoved(final ProcessContext context) { final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - clearState(client); + if (client != null) { + clearState(client); + } } @Override @@ -234,11 +255,14 @@ public class GetHBase extends AbstractProcessor { final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue(); final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue(); final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); // if the table was changed then remove any previous state if (previousTable != null && !tableName.equals(previousTable)) { - clearState(client); + try { + context.getStateManager().clear(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().warn("Failed to clear Cluster State", ioe); + } previousTable = tableName; } @@ -246,7 +270,7 @@ public class GetHBase extends AbstractProcessor { final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final RowSerializer serializer = new JsonRowSerializer(charset); - this.lastResult = getState(client); + this.lastResult = getState(context.getStateManager()); final long defaultMinTime = (initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis()); final long minTime = (lastResult == null ? defaultMinTime : lastResult.getTimestamp()); @@ -388,8 +412,7 @@ public class GetHBase extends AbstractProcessor { } // save state to local storage and to distributed cache - persistState(client, lastResult); - + storeState(lastResult, context.getStateManager()); } catch (final IOException e) { getLogger().error("Failed to receive data from HBase due to {}", e); session.rollback(); @@ -421,27 +444,11 @@ public class GetHBase extends AbstractProcessor { return columns; } - private void persistState(final DistributedMapCacheClient client, final ScanResult scanResult) { - final File stateDir = getStateDir(); - if (!stateDir.exists()) { - stateDir.mkdirs(); - } - - final File file = getStateFile(); - try (final OutputStream fos = new FileOutputStream(file); - final ObjectOutputStream oos = new ObjectOutputStream(fos)) { - oos.writeObject(scanResult); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); - } - - try { - client.put(getKey(), scanResult, new StringSerDe(), new ObjectSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); - } + private void storeState(final ScanResult scanResult, final StateManager stateManager) throws IOException { + stateManager.setState(scanResult.toFlatMap(), Scope.CLUSTER); } + private void clearState(final DistributedMapCacheClient client) { final File localState = getStateFile(); if (localState.exists()) { @@ -455,6 +462,16 @@ public class GetHBase extends AbstractProcessor { } } + + private ScanResult getState(final StateManager stateManager) throws IOException { + final StateMap stateMap = stateManager.getState(Scope.CLUSTER); + if (stateMap.getVersion() < 0) { + return null; + } + + return ScanResult.fromFlatMap(stateMap.toMap()); + } + private ScanResult getState(final DistributedMapCacheClient client) throws IOException { final StringSerDe stringSerDe = new StringSerDe(); final ObjectSerDe objectSerDe = new ObjectSerDe(); @@ -462,12 +479,14 @@ public class GetHBase extends AbstractProcessor { ScanResult scanResult = lastResult; // if we have no previous result, or we just became primary, pull from distributed cache if (scanResult == null || electedPrimaryNode) { - final Object obj = client.get(getKey(), stringSerDe, objectSerDe); - if (obj == null || !(obj instanceof ScanResult)) { - scanResult = null; - } else { - scanResult = (ScanResult) obj; - getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}" , new Object[] {scanResult.getTimestamp()}); + if (client != null) { + final Object obj = client.get(getKey(), stringSerDe, objectSerDe); + if (obj == null || !(obj instanceof ScanResult)) { + scanResult = null; + } else { + scanResult = (ScanResult) obj; + getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()}); + } } // no requirement to pull an update from the distributed cache anymore. @@ -487,16 +506,6 @@ public class GetHBase extends AbstractProcessor { if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) { scanResult = localScanResult; getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache"); - - // Our local persistence file shows a later time than the Distributed service. - // Update the distributed service to match our local state. - try { - client.put(getKey(), localScanResult, stringSerDe, objectSerDe); - } catch (final IOException ioe) { - getLogger().warn("Local timestamp is {}, which is later than Distributed state but failed to update Distributed " - + "state due to {}. If a new node performs GetHBase Listing, data duplication may occur", - new Object[] {localScanResult.getTimestamp(), ioe}); - } } } } catch (final IOException | ClassNotFoundException ioe) { @@ -514,6 +523,13 @@ public class GetHBase extends AbstractProcessor { private final long latestTimestamp; private final Map> matchingCellHashes; + private static final Pattern CELL_ID_PATTERN = Pattern.compile(Pattern.quote(StateKeys.ROW_ID_PREFIX) + "(\\d+)(\\.(\\d+))?"); + + public static class StateKeys { + public static final String TIMESTAMP = "timestamp"; + public static final String ROW_ID_PREFIX = "row."; + } + public ScanResult(final long timestamp, final Map> cellHashes) { latestTimestamp = timestamp; matchingCellHashes = cellHashes; @@ -543,6 +559,81 @@ public class GetHBase extends AbstractProcessor { final String cellHash = new String(cellValue, StandardCharsets.UTF_8); return cellHashes.contains(cellHash); } + + public Map toFlatMap() { + final Map map = new HashMap<>(); + map.put(StateKeys.TIMESTAMP, String.valueOf(latestTimestamp)); + + int rowCounter = 0; + for (final Map.Entry> entry : matchingCellHashes.entrySet()) { + final String rowId = entry.getKey(); + + final String rowIdKey = StateKeys.ROW_ID_PREFIX + rowCounter; + final String cellKeyPrefix = rowIdKey + "."; + map.put(rowIdKey, rowId); + + final Set cellValues = entry.getValue(); + int cellCounter = 0; + for (final String cellValue : cellValues) { + final String cellId = cellKeyPrefix + (cellCounter++); + map.put(cellId, cellValue); + } + + rowCounter++; + } + + return map; + } + + public static ScanResult fromFlatMap(final Map map) { + if (map == null) { + return null; + } + + final String timestampValue = map.get(StateKeys.TIMESTAMP); + if (timestampValue == null) { + return null; + } + + final long timestamp = Long.parseLong(timestampValue); + final Map> rowIndexToMatchingCellHashes = new HashMap<>(); + final Map rowIndexToId = new HashMap<>(); + + for (final Map.Entry entry : map.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = CELL_ID_PATTERN.matcher(key); + if (!matcher.matches()) { + // if it's not a valid key, move on. + continue; + } + + final String rowIndex = matcher.group(1); + final String cellIndex = matcher.group(3); + + Set cellHashes = rowIndexToMatchingCellHashes.get(rowIndex); + if (cellHashes == null) { + cellHashes = new HashSet<>(); + rowIndexToMatchingCellHashes.put(rowIndex, cellHashes); + } + + if (cellIndex == null) { + // this provides a Row ID. + rowIndexToId.put(rowIndex, entry.getValue()); + } else { + cellHashes.add(entry.getValue()); + } + } + + final Map> matchingCellHashes = new HashMap<>(rowIndexToMatchingCellHashes.size()); + for (final Map.Entry> entry : rowIndexToMatchingCellHashes.entrySet()) { + final String rowIndex = entry.getKey(); + final String rowId = rowIndexToId.get(rowIndex); + final Set cellValues = entry.getValue(); + matchingCellHashes.put(rowId, cellValues); + } + + return new ScanResult(timestamp, matchingCellHashes); + } } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java index 92f42f2e6a..8f6d8907a4 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java @@ -16,11 +16,31 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.hbase.GetHBase.ScanResult; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.util.StringSerDe; import org.apache.nifi.reporting.InitializationException; @@ -31,17 +51,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - public class TestGetHBase { private TestRunner runner; @@ -148,27 +157,17 @@ public class TestGetHBase { hBaseClient.addResult("row4", cells, now + 1); runner.run(); runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5); + runner.clearTransferState(); proc = new MockGetHBase(stateFile); - final TestRunner newRunner = TestRunners.newTestRunner(proc); - - newRunner.addControllerService("cacheClient", cacheClient); - newRunner.enableControllerService(cacheClient); - - newRunner.addControllerService("hbaseClient", hBaseClient); - newRunner.enableControllerService(hBaseClient); - - newRunner.setProperty(GetHBase.TABLE_NAME, "nifi"); - newRunner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient"); - newRunner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); hBaseClient.addResult("row0", cells, now - 2); hBaseClient.addResult("row1", cells, now - 1); hBaseClient.addResult("row2", cells, now - 1); hBaseClient.addResult("row3", cells, now); - newRunner.run(100); - newRunner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0); + runner.run(100); + runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0); } @Test @@ -271,8 +270,7 @@ public class TestGetHBase { runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4); // should have a local state file and a cache entry before removing - Assert.assertTrue(proc.getStateFile().exists()); - Assert.assertTrue(cacheClient.containsKey(proc.getKey(), new StringSerDe())); + runner.getStateManager().assertStateSet(Scope.CLUSTER); proc.onRemoved(runner.getProcessContext()); @@ -331,7 +329,7 @@ public class TestGetHBase { } @Test - public void testParseColumns() { + public void testParseColumns() throws IOException { runner.setProperty(GetHBase.COLUMNS, "cf1,cf2:cq1,cf3"); proc.parseColumns(runner.getProcessContext()); @@ -365,6 +363,47 @@ public class TestGetHBase { runner.assertNotValid(); } + + @Test + public void testScanResultConvert() { + final long timestamp = 14L; + final Map> cellHashes = new LinkedHashMap<>(); + + final Set row1Cells = new HashSet<>(); + row1Cells.add("hello"); + row1Cells.add("there"); + cellHashes.put("abc", row1Cells); + + final Set row2Cells = new HashSet<>(); + row2Cells.add("good-bye"); + row2Cells.add("there"); + cellHashes.put("xyz", row2Cells); + + final ScanResult scanResult = new GetHBase.ScanResult(timestamp, cellHashes); + + final Map flatMap = scanResult.toFlatMap(); + assertEquals(7, flatMap.size()); + assertEquals("abc", flatMap.get("row.0")); + + final String row0Cell0 = flatMap.get("row.0.0"); + final String row0Cell1 = flatMap.get("row.0.1"); + assertTrue(row0Cell0.equals("hello") || row0Cell0.equals("there")); + assertTrue(row0Cell1.equals("hello") || row0Cell1.equals("there")); + assertNotSame(row0Cell0, row0Cell1); + + assertEquals("xyz", flatMap.get("row.1")); + final String row1Cell0 = flatMap.get("row.1.0"); + final String row1Cell1 = flatMap.get("row.1.1"); + assertTrue(row1Cell0.equals("good-bye") || row1Cell0.equals("there")); + assertTrue(row1Cell1.equals("good-bye") || row1Cell1.equals("there")); + assertNotSame(row1Cell0, row1Cell1); + + final ScanResult reverted = ScanResult.fromFlatMap(flatMap); + assertEquals(timestamp, reverted.getTimestamp()); + assertEquals(cellHashes, reverted.getMatchingCells()); + } + + // Mock processor to override the location of the state file private static class MockGetHBase extends GetHBase { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index c9bf369faa..efe551f3db 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -402,7 +402,7 @@ public abstract class AbstractListProcessor extends Ab } persist(latestListingTimestamp, identifiers, context.getStateManager()); } catch (final IOException ioe) { - getLogger().warn("Unable to save state due to {}. If NiFi restarted before state is saved, or " + getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + "if another node begins executing this Processor, data duplication may occur.", ioe); }