From e32689a60262110c9ff0ee6c4c9a3623ea789e57 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 12 Jul 2019 18:48:11 +0900 Subject: [PATCH] NIFI-6436 Fix NPE at StandardPublicPort Removing unnecessary ProcessGroup from PublicPort constructor. This closes #3580. Signed-off-by: Mark Payne --- .../org/apache/nifi/controller/AbstractPort.java | 5 ++--- .../org/apache/nifi/remote/RemoteGroupPort.java | 5 ++--- .../java/org/apache/nifi/connectable/LocalPort.java | 2 +- .../nifi/controller/flow/StandardFlowManager.java | 4 ++-- .../nifi/remote/StandardRemoteProcessGroup.java | 6 ++++-- .../org/apache/nifi/remote/StandardPublicPort.java | 13 +++++++------ .../apache/nifi/remote/StandardRemoteGroupPort.java | 5 ++--- .../apache/nifi/remote/TestStandardPublicPort.java | 8 +++++--- .../nifi/remote/TestStandardRemoteGroupPort.java | 2 +- 9 files changed, 26 insertions(+), 24 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 96b59fe56d..9af1bd115c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -70,7 +70,7 @@ public abstract class AbstractPort implements Port { private final AtomicReference name; private final AtomicReference position; private final AtomicReference comments; - private final AtomicReference processGroup; + private final AtomicReference processGroup = new AtomicReference<>(); private final AtomicBoolean lossTolerant; private final AtomicReference scheduledState; private final AtomicInteger concurrentTaskCount; @@ -89,7 +89,7 @@ public abstract class AbstractPort implements Port { private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { + public AbstractPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler) { this.id = requireNonNull(id); this.name = new AtomicReference<>(requireNonNull(name)); position = new AtomicReference<>(new Position(0D, 0D)); @@ -103,7 +103,6 @@ public abstract class AbstractPort implements Port { final List relationshipList = new ArrayList<>(); relationshipList.add(PORT_RELATIONSHIP); relationships = Collections.unmodifiableList(relationshipList); - this.processGroup = new AtomicReference<>(processGroup); this.type = type; penalizationPeriod = new AtomicReference<>("30 sec"); yieldPeriod = new AtomicReference<>("1 sec"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index 5d5fe7ddb7..53706879c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -20,13 +20,12 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination { - public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) { - super(id, name, processGroup, type, scheduler); + public RemoteGroupPort(String id, String name, ConnectableType type, ProcessScheduler scheduler) { + super(id, name, type, scheduler); } public abstract RemoteProcessGroup getRemoteProcessGroup(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index eff0c97714..7f1173e486 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -54,7 +54,7 @@ public class LocalPort extends AbstractPort { final int maxIterations; public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { - super(id, name, null, type, scheduler); + super(id, name, type, scheduler); int maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1")); setMaxConcurrentTasks(maxConcurrentTasks); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index f2f1bc71ba..a4fe16a447 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -139,7 +139,7 @@ public class StandardFlowManager implements FlowManager { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new StandardPublicPort(id, name, null, + return new StandardPublicPort(id, name, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties)); @@ -149,7 +149,7 @@ public class StandardFlowManager implements FlowManager { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new StandardPublicPort(id, name, null, + return new StandardPublicPort(id, name, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 87199f1346..48e21271c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -660,8 +660,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { throw new IllegalStateException("Output Port with ID " + descriptor.getId() + " already exists"); } - final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), + final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties); + port.setProcessGroup(getProcessGroup()); outputPorts.put(descriptor.getId(), port); if (descriptor.getConcurrentlySchedulableTaskCount() != null) { @@ -741,8 +742,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // all nodes in a cluster to use the same UUID. However, we want the ID to be // unique for each Remote Group Port, so that if we have multiple RPG's pointing // to the same target, we have unique ID's for each of those ports. - final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this, + final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), this, TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties); + port.setProcessGroup(getProcessGroup()); if (descriptor.getConcurrentlySchedulableTaskCount() != null) { port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java index 38e733fd9b..66cbc05d4b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java @@ -36,7 +36,6 @@ import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -101,12 +100,12 @@ public class StandardPublicPort extends AbstractPort implements PublicPort { private final Lock requestLock = new ReentrantLock(); private boolean shutdown = false; // guarded by requestLock - public StandardPublicPort(final String id, final String name, final ProcessGroup processGroup, + public StandardPublicPort(final String id, final String name, final TransferDirection direction, final ConnectableType type, final Authorizer authorizer, final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure, final String yieldPeriod, final List identityMappings) { - super(id, name, processGroup, type, scheduler); + super(id, name, type, scheduler); setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); this.authorizer = authorizer; @@ -121,10 +120,12 @@ public class StandardPublicPort extends AbstractPort implements PublicPort { @Override public void reportEvent(final Severity severity, final String category, final String message) { - final String groupId = processGroup.getIdentifier(); - final String groupName = processGroup.getName(); + final String groupId = StandardPublicPort.this.getProcessGroup().getIdentifier(); + final String groupName = StandardPublicPort.this.getProcessGroup().getName(); + final String sourceId = StandardPublicPort.this.getIdentifier(); + final String sourceName = StandardPublicPort.this.getName(); final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT; - bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, id, componentType, name, category, severity.name(), message)); + bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, sourceId, componentType, sourceName, category, severity.name(), message)); } }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 53e7c612f6..e51ea37558 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -28,7 +28,6 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -99,13 +98,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return clientRef.get(); } - public StandardRemoteGroupPort(final String id, final String targetId, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, + public StandardRemoteGroupPort(final String id, final String targetId, final String name, final RemoteProcessGroup remoteGroup, final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { // remote group port id needs to be unique but cannot just be the id of the port // in the remote group instance. this supports referencing the same remote // instance more than once. - super(id, name, processGroup, type, scheduler); + super(id, name, type, scheduler); this.targetId = targetId; this.remoteGroup = remoteGroup; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardPublicPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardPublicPort.java index 6ed48f40a0..218ea0e512 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardPublicPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardPublicPort.java @@ -57,9 +57,11 @@ public class TestStandardPublicPort { final ProcessGroup processGroup = mock(ProcessGroup.class); doReturn("process-group-id").when(processGroup).getIdentifier(); - return new StandardPublicPort("id", "name", processGroup, - TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, true, - nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties)); + final StandardPublicPort port = new StandardPublicPort("id", "name", + TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, true, + nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties)); + port.setProcessGroup(processGroup); + return port; } @Test diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 2ae7498bce..0831f51d01 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -121,7 +121,7 @@ public class TestStandardRemoteGroupPort { } port = spy(new StandardRemoteGroupPort(ID, ID, NAME, - processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null))); + remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null))); doReturn(true).when(remoteGroup).isTransmitting(); doReturn(protocol).when(remoteGroup).getTransportProtocol();