NIFI-259 Corrected some logic in AbstractListProcessor regarding being elected primary node and improved readability in GetHBase a smidge

This commit is contained in:
jpercivall 2016-02-04 16:07:18 -05:00
parent 6f4c3db186
commit 29fb9c9393
2 changed files with 12 additions and 14 deletions

View File

@ -107,7 +107,7 @@ public class GetHBase extends AbstractProcessor {
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" +
" so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
.required(false)
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor {
private volatile ScanResult lastResult = null;
private volatile List<Column> columns = new ArrayList<>();
private volatile boolean electedPrimaryNode = false;
private volatile boolean justElectedPrimaryNode = false;
private volatile String previousTable = null;
@Override
@ -236,9 +236,7 @@ public class GetHBase extends AbstractProcessor {
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
electedPrimaryNode = true;
}
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
@OnRemoved
@ -411,7 +409,7 @@ public class GetHBase extends AbstractProcessor {
lastResult = scanResult;
}
// save state to local storage and to distributed cache
// save state using the framework's state manager
storeState(lastResult, context.getStateManager());
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
@ -478,7 +476,7 @@ public class GetHBase extends AbstractProcessor {
ScanResult scanResult = lastResult;
// if we have no previous result, or we just became primary, pull from distributed cache
if (scanResult == null || electedPrimaryNode) {
if (scanResult == null || justElectedPrimaryNode) {
if (client != null) {
final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
if (obj == null || !(obj instanceof ScanResult)) {
@ -490,7 +488,7 @@ public class GetHBase extends AbstractProcessor {
}
// no requirement to pull an update from the distributed cache anymore.
electedPrimaryNode = false;
justElectedPrimaryNode = false;
}
// Check the persistence file. We want to use the latest timestamp that we have so that

View File

@ -163,7 +163,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private volatile Set<String> latestIdentifiersListed = new HashSet<>();
private volatile Long lastListingTime = null;
private volatile boolean electedPrimaryNode = false;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetListing = false;
static final String TIMESTAMP = "timestamp";
@ -198,9 +198,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
electedPrimaryNode = true;
}
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
@OnScheduled
@ -222,7 +220,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// delete the local file, since it is no longer needed
final File localFile = new File(path);
if (localFile.exists() && !!localFile.delete()) {
if (localFile.exists() && !localFile.delete()) {
getLogger().warn("Migrated state but failed to delete local persistence file");
}
@ -322,7 +320,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
try {
// We need to fetch the state from the cluster if we don't yet know the last listing time,
// or if we were just elected the primary node
if (this.lastListingTime == null || electedPrimaryNode) {
if (this.lastListingTime == null || justElectedPrimaryNode) {
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final Map<String, String> stateValues = stateMap.toMap();
final String timestamp = stateValues.get(TIMESTAMP);
@ -343,6 +341,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
latestIdentifiersListed.add(value);
}
}
justElectedPrimaryNode = false;
}
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");