NIFI-259: Bug fixes

This commit is contained in:
Mark Payne 2016-01-13 15:11:53 -05:00
parent 06f525b674
commit 0cd6f80f36
4 changed files with 16 additions and 9 deletions

View File

@ -553,6 +553,8 @@ Note, the above `kinit` command requires that Kerberos client libraries be insta
[source]
yum install krb5-workstation krb5-libs krb5-auth-dialog
Once this is complete, the /etc/krb5.conf will need to be configured appropriately for your organization's Kerberos envrionment.
Now, when we start NiFi, it will use Kerberos to authentication as the `nifi` user when communicating with ZooKeeper.

View File

@ -310,6 +310,9 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
invalidateClient();
setState(stateValues, version, componentId);
}
if (Code.NODEEXISTS == ke.code()) {
setState(stateValues, version, componentId);
}
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
} catch (final IOException ioe) {

View File

@ -209,12 +209,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
// Check if state already exists for this path. If so, we have already migrated the state.
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
if (stateMap.getVersion() == -1L) {
try {
// Migrate state from the old way of managing state (distributed cache service and local file)
// to the new mechanism (State Manager).
migrateState(path, client, context.getStateManager());
migrateState(path, client, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
throw new IOException("Failed to properly migrate state to State Manager", ioe);
}
@ -237,7 +237,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (resetListing) {
context.getStateManager().clear(Scope.CLUSTER);
context.getStateManager().clear(getStateScope(context));
resetListing = false;
}
}
@ -250,9 +250,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* @param path the path to migrate state for
* @param client the DistributedMapCacheClient that is capable of obtaining the current state
* @param stateManager the StateManager to use in order to store the new state
* @param scope the scope to use
* @throws IOException if unable to retrieve or store the state
*/
private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager) throws IOException {
private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
Long minTimestamp = null;
final Set<String> latestIdentifiersListed = new HashSet<>();
@ -289,11 +290,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (minTimestamp != null) {
persist(minTimestamp, latestIdentifiersListed, stateManager);
persist(minTimestamp, latestIdentifiersListed, stateManager, scope);
}
}
private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager) throws IOException {
private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1);
updatedState.put(TIMESTAMP, String.valueOf(timestamp));
int counter = 0;
@ -301,7 +302,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final String index = String.valueOf(++counter);
updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
}
stateManager.setState(updatedState, Scope.CLUSTER);
stateManager.setState(updatedState, scope);
}
protected String getKey(final String directory) {
@ -322,7 +323,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// We need to fetch the state from the cluster if we don't yet know the last listing time,
// or if we were just elected the primary node
if (this.lastListingTime == null || electedPrimaryNode) {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final Map<String, String> stateValues = stateMap.toMap();
final String timestamp = stateValues.get(TIMESTAMP);
@ -409,7 +410,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
for (final T entity : newEntries) {
identifiers.add(entity.getIdentifier());
}
persist(latestListingTimestamp, identifiers, context.getStateManager());
persist(latestListingTimestamp, identifiers, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);

View File

@ -150,6 +150,7 @@ public class TailFile extends AbstractProcessor {
properties.add(ROLLING_FILENAME_PATTERN);
properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build());
properties.add(START_POSITION);
properties.add(FILE_LOCATION);
return properties;
}