NIFI-4436: Bug fixes

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2018-01-02 13:09:09 -05:00 committed by Bryan Bende
parent 20b539aac3
commit 118667a601
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 33 additions and 2 deletions

View File

@ -3284,7 +3284,23 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
final Set<String> knownVariables = getKnownVariableNames(); final Set<String> knownVariables = getKnownVariableNames();
final StandardVersionControlInformation originalVci = this.versionControlInfo.get();
try {
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables);
} catch (final Throwable t) {
// The proposed snapshot may not have any Versioned Flow Coordinates. As a result, the call to #updateProcessGroup may
// set this PG's Version Control Info to null. During the normal flow of control,
// the Version Control Information is set appropriately at the end. However, if an Exception is thrown, we need to ensure
// that we don't leave the Version Control Info as null. It's also important to note here that the Atomic Reference is used
// as a means of retrieving the value without obtaining a read lock, but the value is never updated outside of a write lock.
// As a result, it is safe to use the get() and then the set() methods of the AtomicReference without introducing the 'check-then-modify' problem.
if (this.versionControlInfo.get() == null) {
this.versionControlInfo.set(originalVci);
}
throw t;
}
} catch (final ProcessorInstantiationException pie) { } catch (final ProcessorInstantiationException pie) {
throw new IllegalStateException("Failed to update flow", pie); throw new IllegalStateException("Failed to update flow", pie);
} finally { } finally {
@ -3613,7 +3629,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (connection == null) { if (connection == null) {
final Connection added = addConnection(group, proposedConnection, componentIdSeed); final Connection added = addConnection(group, proposedConnection, componentIdSeed);
LOG.info("Added {} to {}", added, this); LOG.info("Added {} to {}", added, this);
} else if (!connection.getSource().isRunning() && !connection.getDestination().isRunning()) { } else if (isUpdateable(connection)) {
// If the connection needs to be updated, then the source and destination will already have // If the connection needs to be updated, then the source and destination will already have
// been stopped (else, the validation above would fail). So if the source or the destination is running, // been stopped (else, the validation above would fail). So if the source or the destination is running,
// then we know that we don't need to update the connection. // then we know that we don't need to update the connection.
@ -3690,6 +3706,20 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
} }
private boolean isUpdateable(final Connection connection) {
final Connectable source = connection.getSource();
if (source.getConnectableType() != ConnectableType.FUNNEL && source.isRunning()) {
return false;
}
final Connectable destination = connection.getDestination();
if (destination.getConnectableType() != ConnectableType.FUNNEL && destination.isRunning()) {
return false;
}
return true;
}
private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) { private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits(); long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();

View File

@ -275,6 +275,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
public ProcessGroup disconnectVersionControl(final String groupId) { public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId); final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.disconnectVersionControl(true); group.disconnectVersionControl(true);
group.onComponentModified();
return group; return group;
} }