NIFI-6436 Fix NPE at StandardPublicPort

Removing unnecessary ProcessGroup from PublicPort constructor.

This closes #3580.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2019-07-12 18:48:11 +09:00 committed by Mark Payne
parent 16f3dbdc6a
commit e32689a602
9 changed files with 26 additions and 24 deletions

View File

@ -70,7 +70,7 @@ public abstract class AbstractPort implements Port {
private final AtomicReference<String> name; private final AtomicReference<String> name;
private final AtomicReference<Position> position; private final AtomicReference<Position> position;
private final AtomicReference<String> comments; private final AtomicReference<String> comments;
private final AtomicReference<ProcessGroup> processGroup; private final AtomicReference<ProcessGroup> processGroup = new AtomicReference<>();
private final AtomicBoolean lossTolerant; private final AtomicBoolean lossTolerant;
private final AtomicReference<ScheduledState> scheduledState; private final AtomicReference<ScheduledState> scheduledState;
private final AtomicInteger concurrentTaskCount; private final AtomicInteger concurrentTaskCount;
@ -89,7 +89,7 @@ public abstract class AbstractPort implements Port {
private final Lock readLock = rwLock.readLock(); private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock(); private final Lock writeLock = rwLock.writeLock();
public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { public AbstractPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler) {
this.id = requireNonNull(id); this.id = requireNonNull(id);
this.name = new AtomicReference<>(requireNonNull(name)); this.name = new AtomicReference<>(requireNonNull(name));
position = new AtomicReference<>(new Position(0D, 0D)); position = new AtomicReference<>(new Position(0D, 0D));
@ -103,7 +103,6 @@ public abstract class AbstractPort implements Port {
final List<Relationship> relationshipList = new ArrayList<>(); final List<Relationship> relationshipList = new ArrayList<>();
relationshipList.add(PORT_RELATIONSHIP); relationshipList.add(PORT_RELATIONSHIP);
relationships = Collections.unmodifiableList(relationshipList); relationships = Collections.unmodifiableList(relationshipList);
this.processGroup = new AtomicReference<>(processGroup);
this.type = type; this.type = type;
penalizationPeriod = new AtomicReference<>("30 sec"); penalizationPeriod = new AtomicReference<>("30 sec");
yieldPeriod = new AtomicReference<>("1 sec"); yieldPeriod = new AtomicReference<>("1 sec");

View File

@ -20,13 +20,12 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination { public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination {
public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) { public RemoteGroupPort(String id, String name, ConnectableType type, ProcessScheduler scheduler) {
super(id, name, processGroup, type, scheduler); super(id, name, type, scheduler);
} }
public abstract RemoteProcessGroup getRemoteProcessGroup(); public abstract RemoteProcessGroup getRemoteProcessGroup();

View File

@ -54,7 +54,7 @@ public class LocalPort extends AbstractPort {
final int maxIterations; final int maxIterations;
public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) {
super(id, name, null, type, scheduler); super(id, name, type, scheduler);
int maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1")); int maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1"));
setMaxConcurrentTasks(maxConcurrentTasks); setMaxConcurrentTasks(maxConcurrentTasks);

View File

@ -139,7 +139,7 @@ public class StandardFlowManager implements FlowManager {
id = requireNonNull(id).intern(); id = requireNonNull(id).intern();
name = requireNonNull(name).intern(); name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id); verifyPortIdDoesNotExist(id);
return new StandardPublicPort(id, name, null, return new StandardPublicPort(id, name,
TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, authorizer, bulletinRepository,
processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(), processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(),
IdentityMappingUtil.getIdentityMappings(nifiProperties)); IdentityMappingUtil.getIdentityMappings(nifiProperties));
@ -149,7 +149,7 @@ public class StandardFlowManager implements FlowManager {
id = requireNonNull(id).intern(); id = requireNonNull(id).intern();
name = requireNonNull(name).intern(); name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id); verifyPortIdDoesNotExist(id);
return new StandardPublicPort(id, name, null, return new StandardPublicPort(id, name,
TransferDirection.SEND, ConnectableType.OUTPUT_PORT, authorizer, bulletinRepository, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, authorizer, bulletinRepository,
processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(), processScheduler, isSiteToSiteSecure, nifiProperties.getBoredYieldDuration(),
IdentityMappingUtil.getIdentityMappings(nifiProperties)); IdentityMappingUtil.getIdentityMappings(nifiProperties));

View File

@ -660,8 +660,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
throw new IllegalStateException("Output Port with ID " + descriptor.getId() + " already exists"); throw new IllegalStateException("Output Port with ID " + descriptor.getId() + " already exists");
} }
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(),
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties); this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
port.setProcessGroup(getProcessGroup());
outputPorts.put(descriptor.getId(), port); outputPorts.put(descriptor.getId(), port);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) { if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@ -741,8 +742,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// all nodes in a cluster to use the same UUID. However, we want the ID to be // all nodes in a cluster to use the same UUID. However, we want the ID to be
// unique for each Remote Group Port, so that if we have multiple RPG's pointing // unique for each Remote Group Port, so that if we have multiple RPG's pointing
// to the same target, we have unique ID's for each of those ports. // to the same target, we have unique ID's for each of those ports.
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this, final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), this,
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties); TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
port.setProcessGroup(getProcessGroup());
if (descriptor.getConcurrentlySchedulableTaskCount() != null) { if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount()); port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount());

View File

@ -36,7 +36,6 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
@ -101,12 +100,12 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
private final Lock requestLock = new ReentrantLock(); private final Lock requestLock = new ReentrantLock();
private boolean shutdown = false; // guarded by requestLock private boolean shutdown = false; // guarded by requestLock
public StandardPublicPort(final String id, final String name, final ProcessGroup processGroup, public StandardPublicPort(final String id, final String name,
final TransferDirection direction, final ConnectableType type, final Authorizer authorizer, final TransferDirection direction, final ConnectableType type, final Authorizer authorizer,
final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure, final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure,
final String yieldPeriod, final List<IdentityMapping> identityMappings) { final String yieldPeriod, final List<IdentityMapping> identityMappings) {
super(id, name, processGroup, type, scheduler); super(id, name, type, scheduler);
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
this.authorizer = authorizer; this.authorizer = authorizer;
@ -121,10 +120,12 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
@Override @Override
public void reportEvent(final Severity severity, final String category, final String message) { public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = processGroup.getIdentifier(); final String groupId = StandardPublicPort.this.getProcessGroup().getIdentifier();
final String groupName = processGroup.getName(); final String groupName = StandardPublicPort.this.getProcessGroup().getName();
final String sourceId = StandardPublicPort.this.getIdentifier();
final String sourceName = StandardPublicPort.this.getName();
final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT; final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, id, componentType, name, category, severity.name(), message)); bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, groupName, sourceId, componentType, sourceName, category, severity.name(), message));
} }
}; };

View File

@ -28,7 +28,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -99,13 +98,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return clientRef.get(); return clientRef.get();
} }
public StandardRemoteGroupPort(final String id, final String targetId, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, public StandardRemoteGroupPort(final String id, final String targetId, final String name, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler, final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler,
final NiFiProperties nifiProperties) { final NiFiProperties nifiProperties) {
// remote group port id needs to be unique but cannot just be the id of the port // remote group port id needs to be unique but cannot just be the id of the port
// in the remote group instance. this supports referencing the same remote // in the remote group instance. this supports referencing the same remote
// instance more than once. // instance more than once.
super(id, name, processGroup, type, scheduler); super(id, name, type, scheduler);
this.targetId = targetId; this.targetId = targetId;
this.remoteGroup = remoteGroup; this.remoteGroup = remoteGroup;

View File

@ -57,9 +57,11 @@ public class TestStandardPublicPort {
final ProcessGroup processGroup = mock(ProcessGroup.class); final ProcessGroup processGroup = mock(ProcessGroup.class);
doReturn("process-group-id").when(processGroup).getIdentifier(); doReturn("process-group-id").when(processGroup).getIdentifier();
return new StandardPublicPort("id", "name", processGroup, final StandardPublicPort port = new StandardPublicPort("id", "name",
TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, true, TransferDirection.SEND, ConnectableType.INPUT_PORT, authorizer, bulletinRepository, processScheduler, true,
nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties)); nifiProperties.getBoredYieldDuration(), IdentityMappingUtil.getIdentityMappings(nifiProperties));
port.setProcessGroup(processGroup);
return port;
} }
@Test @Test

View File

@ -121,7 +121,7 @@ public class TestStandardRemoteGroupPort {
} }
port = spy(new StandardRemoteGroupPort(ID, ID, NAME, port = spy(new StandardRemoteGroupPort(ID, ID, NAME,
processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null))); remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null)));
doReturn(true).when(remoteGroup).isTransmitting(); doReturn(true).when(remoteGroup).isTransmitting();
doReturn(protocol).when(remoteGroup).getTransportProtocol(); doReturn(protocol).when(remoteGroup).getTransportProtocol();