NIFI-259: Updated GetHBase to use new State Management; bug fixes; updated docs

This commit is contained in:
Mark Payne 2016-01-13 12:47:08 -05:00
parent 774c29a4da
commit bbce596d74
6 changed files with 248 additions and 102 deletions

View File

@ -443,7 +443,27 @@ in _nifi.properties_ also becomes relevant. This specifies the ZooKeeper propert
with the list of ZooKeeper servers. Each of these servers is configured as <hostname>:<client port>[:<leader election port>]. For example, `myhost:2888:3888`.
This list of nodes should be the same nodes in the NiFi cluster that have the `nifi.state.management.embedded.zookeeper.start`
property set to `true`. Also note that because ZooKeeper will be listening on these ports, the firewall may need to be configured to open these ports
for incoming traffic, at least between nodes in the cluster.
for incoming traffic, at least between nodes in the cluster. Additionally, the port to listen on for client connections must be opened in the firewall.
The default value for this is _2181_ but can be configured via the _clientPort_ property in the _zookeeper.properties_ file.
When using an embedded ZooKeeper, the _conf/zookeeper.properties_ file has a property named `dataDir`. By default, this value is set to `./state/zookeeper`.
If more than one NiFi node is running an embedded ZooKeeper, it is important to tell the server which one it is. This is accomplished by creating a file named
_myid_ and placing it in ZooKeeper's data directory. The contents of this file should be index of the server. So for one of the ZooKeeper servers, we will
accomplish this by performing the following commands:
[source]
cd $NIFI_HOME
mkdir state
mkdir state/zookeeper
echo 1 > state/zookeeper/myid
For the next NiFi Node that will run ZooKeeper, we can accomplish this by performing the following commands:
cd $NIFI_HOME
mkdir state
mkdir state/zookeeper
echo 2 > state/zookeeper/myid
And so on.
For more information on the properties used to administer ZooKeeper, see the
link:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html[ZooKeeper Admin Guide].

View File

@ -102,7 +102,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
.sensitive(true)
.build();
private static final int ENCODING_VERSION = 1;
private static final byte ENCODING_VERSION = 1;
private ZooKeeper zooKeeper;
@ -251,7 +251,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
private byte[] serialize(final Map<String, String> stateValues) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeInt(ENCODING_VERSION);
dos.writeByte(ENCODING_VERSION);
dos.writeInt(stateValues.size());
for (final Map.Entry<String, String> entry : stateValues.entrySet()) {
dos.writeUTF(entry.getKey());
@ -265,7 +265,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(data);
final DataInputStream dis = new DataInputStream(bais)) {
final int encodingVersion = dis.readInt();
final byte encodingVersion = dis.readByte();
if (encodingVersion > ENCODING_VERSION) {
throw new IOException("Retrieved a response from ZooKeeper when retrieving state for component with ID " + componentId
+ ", but the response was encoded using the ZooKeeperStateProvider Encoding Version of " + encodingVersion

View File

@ -188,7 +188,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
if (stateMap.getVersion() == -1L) {
final HDFSListing serviceListing = getListingFromService(context);
if (serviceListing != null) {
persistState(serviceListing, context.getStateManager());
context.getStateManager().setState(serviceListing.toMap(), Scope.CLUSTER);
}
}
}
@ -213,10 +213,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
}
private void persistState(final HDFSListing listing, final StateManager stateManager) throws IOException {
final Map<String, String> stateValues = listing.toMap();
stateManager.setState(stateValues, Scope.CLUSTER);
}
private Long getMinTimestamp(final String directory, final HDFSListing remoteListing) throws IOException {
// No cluster-wide state has been recovered. Just use whatever values we already have.

View File

@ -16,8 +16,30 @@
*/
package org.apache.nifi.hbase;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -32,6 +54,9 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.io.JsonRowSerializer;
@ -50,28 +75,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@ -84,6 +87,9 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
})
@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) "
+ "and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is "
+ "run on Primary Node only and the Primary Node changes.")
public class GetHBase extends AbstractProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@ -101,7 +107,7 @@ public class GetHBase extends AbstractProcessor {
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" +
" so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
.required(true)
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@ -197,7 +203,20 @@ public class GetHBase extends AbstractProcessor {
}
@OnScheduled
public void parseColumns(final ProcessContext context) {
public void parseColumns(final ProcessContext context) throws IOException {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() < 0) {
// no state has been stored in the State Manager - check if we have state stored in the
// DistributedMapCacheClient service and migrate it if so
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final ScanResult scanResult = getState(client);
if (scanResult != null) {
storeState(scanResult, context.getStateManager());
}
clearState(client);
}
final String columnsValue = context.getProperty(COLUMNS).getValue();
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
@ -225,7 +244,9 @@ public class GetHBase extends AbstractProcessor {
@OnRemoved
public void onRemoved(final ProcessContext context) {
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
clearState(client);
if (client != null) {
clearState(client);
}
}
@Override
@ -234,11 +255,14 @@ public class GetHBase extends AbstractProcessor {
final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
// if the table was changed then remove any previous state
if (previousTable != null && !tableName.equals(previousTable)) {
clearState(client);
try {
context.getStateManager().clear(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to clear Cluster State", ioe);
}
previousTable = tableName;
}
@ -246,7 +270,7 @@ public class GetHBase extends AbstractProcessor {
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final RowSerializer serializer = new JsonRowSerializer(charset);
this.lastResult = getState(client);
this.lastResult = getState(context.getStateManager());
final long defaultMinTime = (initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis());
final long minTime = (lastResult == null ? defaultMinTime : lastResult.getTimestamp());
@ -388,8 +412,7 @@ public class GetHBase extends AbstractProcessor {
}
// save state to local storage and to distributed cache
persistState(client, lastResult);
storeState(lastResult, context.getStateManager());
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
@ -421,27 +444,11 @@ public class GetHBase extends AbstractProcessor {
return columns;
}
private void persistState(final DistributedMapCacheClient client, final ScanResult scanResult) {
final File stateDir = getStateDir();
if (!stateDir.exists()) {
stateDir.mkdirs();
}
final File file = getStateFile();
try (final OutputStream fos = new FileOutputStream(file);
final ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(scanResult);
} catch (final IOException ioe) {
getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
}
try {
client.put(getKey(), scanResult, new StringSerDe(), new ObjectSerDe());
} catch (final IOException ioe) {
getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
}
private void storeState(final ScanResult scanResult, final StateManager stateManager) throws IOException {
stateManager.setState(scanResult.toFlatMap(), Scope.CLUSTER);
}
private void clearState(final DistributedMapCacheClient client) {
final File localState = getStateFile();
if (localState.exists()) {
@ -455,6 +462,16 @@ public class GetHBase extends AbstractProcessor {
}
}
private ScanResult getState(final StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
if (stateMap.getVersion() < 0) {
return null;
}
return ScanResult.fromFlatMap(stateMap.toMap());
}
private ScanResult getState(final DistributedMapCacheClient client) throws IOException {
final StringSerDe stringSerDe = new StringSerDe();
final ObjectSerDe objectSerDe = new ObjectSerDe();
@ -462,12 +479,14 @@ public class GetHBase extends AbstractProcessor {
ScanResult scanResult = lastResult;
// if we have no previous result, or we just became primary, pull from distributed cache
if (scanResult == null || electedPrimaryNode) {
final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
if (obj == null || !(obj instanceof ScanResult)) {
scanResult = null;
} else {
scanResult = (ScanResult) obj;
getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}" , new Object[] {scanResult.getTimestamp()});
if (client != null) {
final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
if (obj == null || !(obj instanceof ScanResult)) {
scanResult = null;
} else {
scanResult = (ScanResult) obj;
getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()});
}
}
// no requirement to pull an update from the distributed cache anymore.
@ -487,16 +506,6 @@ public class GetHBase extends AbstractProcessor {
if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) {
scanResult = localScanResult;
getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache");
// Our local persistence file shows a later time than the Distributed service.
// Update the distributed service to match our local state.
try {
client.put(getKey(), localScanResult, stringSerDe, objectSerDe);
} catch (final IOException ioe) {
getLogger().warn("Local timestamp is {}, which is later than Distributed state but failed to update Distributed "
+ "state due to {}. If a new node performs GetHBase Listing, data duplication may occur",
new Object[] {localScanResult.getTimestamp(), ioe});
}
}
}
} catch (final IOException | ClassNotFoundException ioe) {
@ -514,6 +523,13 @@ public class GetHBase extends AbstractProcessor {
private final long latestTimestamp;
private final Map<String, Set<String>> matchingCellHashes;
private static final Pattern CELL_ID_PATTERN = Pattern.compile(Pattern.quote(StateKeys.ROW_ID_PREFIX) + "(\\d+)(\\.(\\d+))?");
public static class StateKeys {
public static final String TIMESTAMP = "timestamp";
public static final String ROW_ID_PREFIX = "row.";
}
public ScanResult(final long timestamp, final Map<String, Set<String>> cellHashes) {
latestTimestamp = timestamp;
matchingCellHashes = cellHashes;
@ -543,6 +559,81 @@ public class GetHBase extends AbstractProcessor {
final String cellHash = new String(cellValue, StandardCharsets.UTF_8);
return cellHashes.contains(cellHash);
}
public Map<String, String> toFlatMap() {
final Map<String, String> map = new HashMap<>();
map.put(StateKeys.TIMESTAMP, String.valueOf(latestTimestamp));
int rowCounter = 0;
for (final Map.Entry<String, Set<String>> entry : matchingCellHashes.entrySet()) {
final String rowId = entry.getKey();
final String rowIdKey = StateKeys.ROW_ID_PREFIX + rowCounter;
final String cellKeyPrefix = rowIdKey + ".";
map.put(rowIdKey, rowId);
final Set<String> cellValues = entry.getValue();
int cellCounter = 0;
for (final String cellValue : cellValues) {
final String cellId = cellKeyPrefix + (cellCounter++);
map.put(cellId, cellValue);
}
rowCounter++;
}
return map;
}
public static ScanResult fromFlatMap(final Map<String, String> map) {
if (map == null) {
return null;
}
final String timestampValue = map.get(StateKeys.TIMESTAMP);
if (timestampValue == null) {
return null;
}
final long timestamp = Long.parseLong(timestampValue);
final Map<String, Set<String>> rowIndexToMatchingCellHashes = new HashMap<>();
final Map<String, String> rowIndexToId = new HashMap<>();
for (final Map.Entry<String, String> entry : map.entrySet()) {
final String key = entry.getKey();
final Matcher matcher = CELL_ID_PATTERN.matcher(key);
if (!matcher.matches()) {
// if it's not a valid key, move on.
continue;
}
final String rowIndex = matcher.group(1);
final String cellIndex = matcher.group(3);
Set<String> cellHashes = rowIndexToMatchingCellHashes.get(rowIndex);
if (cellHashes == null) {
cellHashes = new HashSet<>();
rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
}
if (cellIndex == null) {
// this provides a Row ID.
rowIndexToId.put(rowIndex, entry.getValue());
} else {
cellHashes.add(entry.getValue());
}
}
final Map<String, Set<String>> matchingCellHashes = new HashMap<>(rowIndexToMatchingCellHashes.size());
for (final Map.Entry<String, Set<String>> entry : rowIndexToMatchingCellHashes.entrySet()) {
final String rowIndex = entry.getKey();
final String rowId = rowIndexToId.get(rowIndex);
final Set<String> cellValues = entry.getValue();
matchingCellHashes.put(rowId, cellValues);
}
return new ScanResult(timestamp, matchingCellHashes);
}
}
}

View File

@ -16,11 +16,31 @@
*/
package org.apache.nifi.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.hbase.GetHBase.ScanResult;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.reporting.InitializationException;
@ -31,17 +51,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TestGetHBase {
private TestRunner runner;
@ -148,27 +157,17 @@ public class TestGetHBase {
hBaseClient.addResult("row4", cells, now + 1);
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
runner.clearTransferState();
proc = new MockGetHBase(stateFile);
final TestRunner newRunner = TestRunners.newTestRunner(proc);
newRunner.addControllerService("cacheClient", cacheClient);
newRunner.enableControllerService(cacheClient);
newRunner.addControllerService("hbaseClient", hBaseClient);
newRunner.enableControllerService(hBaseClient);
newRunner.setProperty(GetHBase.TABLE_NAME, "nifi");
newRunner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
newRunner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
newRunner.run(100);
newRunner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
@Test
@ -271,8 +270,7 @@ public class TestGetHBase {
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
// should have a local state file and a cache entry before removing
Assert.assertTrue(proc.getStateFile().exists());
Assert.assertTrue(cacheClient.containsKey(proc.getKey(), new StringSerDe()));
runner.getStateManager().assertStateSet(Scope.CLUSTER);
proc.onRemoved(runner.getProcessContext());
@ -331,7 +329,7 @@ public class TestGetHBase {
}
@Test
public void testParseColumns() {
public void testParseColumns() throws IOException {
runner.setProperty(GetHBase.COLUMNS, "cf1,cf2:cq1,cf3");
proc.parseColumns(runner.getProcessContext());
@ -365,6 +363,47 @@ public class TestGetHBase {
runner.assertNotValid();
}
@Test
public void testScanResultConvert() {
final long timestamp = 14L;
final Map<String, Set<String>> cellHashes = new LinkedHashMap<>();
final Set<String> row1Cells = new HashSet<>();
row1Cells.add("hello");
row1Cells.add("there");
cellHashes.put("abc", row1Cells);
final Set<String> row2Cells = new HashSet<>();
row2Cells.add("good-bye");
row2Cells.add("there");
cellHashes.put("xyz", row2Cells);
final ScanResult scanResult = new GetHBase.ScanResult(timestamp, cellHashes);
final Map<String, String> flatMap = scanResult.toFlatMap();
assertEquals(7, flatMap.size());
assertEquals("abc", flatMap.get("row.0"));
final String row0Cell0 = flatMap.get("row.0.0");
final String row0Cell1 = flatMap.get("row.0.1");
assertTrue(row0Cell0.equals("hello") || row0Cell0.equals("there"));
assertTrue(row0Cell1.equals("hello") || row0Cell1.equals("there"));
assertNotSame(row0Cell0, row0Cell1);
assertEquals("xyz", flatMap.get("row.1"));
final String row1Cell0 = flatMap.get("row.1.0");
final String row1Cell1 = flatMap.get("row.1.1");
assertTrue(row1Cell0.equals("good-bye") || row1Cell0.equals("there"));
assertTrue(row1Cell1.equals("good-bye") || row1Cell1.equals("there"));
assertNotSame(row1Cell0, row1Cell1);
final ScanResult reverted = ScanResult.fromFlatMap(flatMap);
assertEquals(timestamp, reverted.getTimestamp());
assertEquals(cellHashes, reverted.getMatchingCells());
}
// Mock processor to override the location of the state file
private static class MockGetHBase extends GetHBase {

View File

@ -402,7 +402,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
persist(latestListingTimestamp, identifiers, context.getStateManager());
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi restarted before state is saved, or "
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
}