mirror of https://github.com/apache/nifi.git
NIFI-2339 made exception statements more vague and generally limited to identifiers only to avoid any authorization issues. This closes #764
This commit is contained in:
parent
6bcc415eb8
commit
c10d11d378
|
@ -152,6 +152,6 @@ public class ConnectableDTO {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ConnectableDTO [Type=" + type + ", Name=" + name + ", Id=" + id + "]";
|
return "ConnectableDTO [Id=" + id + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -232,6 +232,6 @@ public class ConnectionDTO extends ComponentDTO {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ConnectionDTO [name: " + name + " from " + source + " to " + destination + "]";
|
return "ConnectionDTO [id: " + getId() + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,11 +134,11 @@ public abstract class AbstractPort implements Port {
|
||||||
final ProcessGroup parentGroup = this.processGroup.get();
|
final ProcessGroup parentGroup = this.processGroup.get();
|
||||||
if (getConnectableType() == ConnectableType.INPUT_PORT) {
|
if (getConnectableType() == ConnectableType.INPUT_PORT) {
|
||||||
if (parentGroup.getInputPortByName(name) != null) {
|
if (parentGroup.getInputPortByName(name) != null) {
|
||||||
throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Input Port named " + name);
|
throw new IllegalStateException("The requested new port name is not available");
|
||||||
}
|
}
|
||||||
} else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
|
} else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
|
||||||
if (parentGroup.getOutputPortByName(name) != null) {
|
if (parentGroup.getOutputPortByName(name) != null) {
|
||||||
throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Output Port named " + name);
|
throw new IllegalStateException("The requested new port name is not available");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,12 +292,12 @@ public abstract class AbstractPort implements Port {
|
||||||
|
|
||||||
if (!canConnectionBeRemoved(connection)) {
|
if (!canConnectionBeRemoved(connection)) {
|
||||||
// TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean
|
// TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean
|
||||||
throw new IllegalStateException(connection + " cannot be removed");
|
throw new IllegalStateException("Connection " + connection.getIdentifier() + " cannot be removed");
|
||||||
}
|
}
|
||||||
|
|
||||||
final boolean removed = outgoingConnections.remove(connection);
|
final boolean removed = outgoingConnections.remove(connection);
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
throw new IllegalStateException(connection + " is not registered with " + this);
|
throw new IllegalStateException("Connection " + connection.getIdentifier() + " is not registered with " + this.getIdentifier());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -369,7 +369,7 @@ public abstract class AbstractPort implements Port {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", getName()).append("id", getIdentifier()).toString();
|
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -528,7 +528,7 @@ public abstract class AbstractPort implements Port {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this + " is running");
|
throw new IllegalStateException(this.getIdentifier() + " is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ignoreConnections) {
|
if (!ignoreConnections) {
|
||||||
|
@ -540,7 +540,7 @@ public abstract class AbstractPort implements Port {
|
||||||
if (connection.getSource().equals(this)) {
|
if (connection.getSource().equals(this)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException(this + " is the destination of another component");
|
throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -555,9 +555,9 @@ public abstract class AbstractPort implements Port {
|
||||||
try {
|
try {
|
||||||
switch (scheduledState.get()) {
|
switch (scheduledState.get()) {
|
||||||
case DISABLED:
|
case DISABLED:
|
||||||
throw new IllegalStateException(this + " cannot be started because it is disabled");
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
throw new IllegalStateException(this + " cannot be started because it is already running");
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
|
||||||
case STOPPED:
|
case STOPPED:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -565,7 +565,7 @@ public abstract class AbstractPort implements Port {
|
||||||
|
|
||||||
final Collection<ValidationResult> validationResults = getValidationErrors();
|
final Collection<ValidationResult> validationResults = getValidationErrors();
|
||||||
if (!validationResults.isEmpty()) {
|
if (!validationResults.isEmpty()) {
|
||||||
throw new IllegalStateException(this + " is not in a valid state: " + validationResults.iterator().next().getExplanation());
|
throw new IllegalStateException(this.getIdentifier() + " is not in a valid state: " + validationResults.iterator().next().getExplanation());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
@ -575,7 +575,7 @@ public abstract class AbstractPort implements Port {
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStop() {
|
public void verifyCanStop() {
|
||||||
if (getScheduledState() != ScheduledState.RUNNING) {
|
if (getScheduledState() != ScheduledState.RUNNING) {
|
||||||
throw new IllegalStateException(this + " is not scheduled to run");
|
throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -584,7 +584,7 @@ public abstract class AbstractPort implements Port {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this + " is not stopped");
|
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
@ -596,7 +596,7 @@ public abstract class AbstractPort implements Port {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (getScheduledState() != ScheduledState.DISABLED) {
|
if (getScheduledState() != ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(this + " is not disabled");
|
throw new IllegalStateException(this.getIdentifier() + " is not disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
|
@ -610,7 +610,7 @@ public abstract class AbstractPort implements Port {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (getScheduledState() != ScheduledState.STOPPED) {
|
if (getScheduledState() != ScheduledState.STOPPED) {
|
||||||
throw new IllegalStateException(this + " is not stopped");
|
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
||||||
}
|
}
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -621,7 +621,7 @@ public abstract class AbstractPort implements Port {
|
||||||
private void verifyNoActiveThreads() throws IllegalStateException {
|
private void verifyNoActiveThreads() throws IllegalStateException {
|
||||||
final int threadCount = processScheduler.getActiveThreadCount(this);
|
final int threadCount = processScheduler.getActiveThreadCount(this);
|
||||||
if (threadCount > 0) {
|
if (threadCount > 0) {
|
||||||
throw new IllegalStateException(this + " has " + threadCount + " threads still active");
|
throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -217,7 +217,7 @@ public class StandardFunnel implements Funnel {
|
||||||
|
|
||||||
final boolean removed = outgoingConnections.remove(connection);
|
final boolean removed = outgoingConnections.remove(connection);
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
throw new IllegalStateException(connection + " is not registered with " + this);
|
throw new IllegalStateException(connection.getIdentifier() + " is not registered with " + this.getIdentifier());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -505,7 +505,7 @@ public class StandardFunnel implements Funnel {
|
||||||
if (connection.getSource().equals(this)) {
|
if (connection.getSource().equals(this)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException(this + " is the destination of another component");
|
throw new IllegalStateException("Funnel " + this.getIdentifier() + " is the destination of another component");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -303,7 +303,7 @@ public final class StandardConnection implements Connection {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships();
|
return "Connection[Source ID=" + id + ",Dest ID=" + getDestination().getIdentifier() + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -453,19 +453,19 @@ public final class StandardConnection implements Connection {
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (!flowFileQueue.isEmpty()) {
|
if (!flowFileQueue.isEmpty()) {
|
||||||
throw new IllegalStateException("Queue not empty for " + this);
|
throw new IllegalStateException("Queue not empty for " + this.getIdentifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (source.isRunning()) {
|
if (source.isRunning()) {
|
||||||
if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
|
if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
|
||||||
throw new IllegalStateException("Source of Connection (" + source + ") is running");
|
throw new IllegalStateException("Source of Connection (" + source.getIdentifier() + ") is running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final Connectable dest = destination.get();
|
final Connectable dest = destination.get();
|
||||||
if (dest.isRunning()) {
|
if (dest.isRunning()) {
|
||||||
if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
|
if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
|
||||||
throw new IllegalStateException("Destination of Connection (" + dest + ") is running");
|
throw new IllegalStateException("Destination of Connection (" + dest.getIdentifier() + ") is running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1957,14 +1957,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
// validate the names of Input Ports
|
// validate the names of Input Ports
|
||||||
for (final PortDTO port : templateContents.getInputPorts()) {
|
for (final PortDTO port : templateContents.getInputPorts()) {
|
||||||
if (group.getInputPortByName(port.getName()) != null) {
|
if (group.getInputPortByName(port.getName()) != null) {
|
||||||
throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName());
|
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate the names of Output Ports
|
// validate the names of Output Ports
|
||||||
for (final PortDTO port : templateContents.getOutputPorts()) {
|
for (final PortDTO port : templateContents.getOutputPorts()) {
|
||||||
if (group.getOutputPortByName(port.getName()) != null) {
|
if (group.getOutputPortByName(port.getName()) != null) {
|
||||||
throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName());
|
throw new IllegalStateException("One or more of the proposed Port names is not available in the process group");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2906,7 +2906,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
@Override
|
@Override
|
||||||
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
|
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
|
||||||
if (isTerminated()) {
|
if (isTerminated()) {
|
||||||
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
|
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
|
||||||
}
|
}
|
||||||
|
|
||||||
reportingTaskNode.verifyCanStart();
|
reportingTaskNode.verifyCanStart();
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.controller;
|
package org.apache.nifi.controller;
|
||||||
|
|
||||||
import org.apache.nifi.controller.Counter;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class StandardCounter implements Counter {
|
public class StandardCounter implements Counter {
|
||||||
|
@ -33,33 +32,39 @@ public class StandardCounter implements Counter {
|
||||||
this.value = new AtomicLong(0L);
|
this.value = new AtomicLong(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void adjust(final long delta) {
|
public void adjust(final long delta) {
|
||||||
this.value.addAndGet(delta);
|
this.value.addAndGet(delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getValue() {
|
public long getValue() {
|
||||||
return this.value.get();
|
return this.value.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getContext() {
|
public String getContext() {
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getIdentifier() {
|
public String getIdentifier() {
|
||||||
return identifier;
|
return identifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
this.value.set(0);
|
this.value.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Counter[identifier=" + identifier + ", context=" + context + ", name=" + name + ", value=" + value + ']';
|
return "Counter[identifier=" + identifier + ']';
|
||||||
}
|
}
|
||||||
|
|
||||||
public static UnmodifiableCounter unmodifiableCounter(final Counter counter) {
|
public static UnmodifiableCounter unmodifiableCounter(final Counter counter) {
|
||||||
|
|
|
@ -894,14 +894,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||||
final Set<String> userControls = portDTO.getUserAccessControl();
|
final Set<String> userControls = portDTO.getUserAccessControl();
|
||||||
if (userControls != null && !userControls.isEmpty()) {
|
if (userControls != null && !userControls.isEmpty()) {
|
||||||
if (!(port instanceof RootGroupPort)) {
|
if (!(port instanceof RootGroupPort)) {
|
||||||
throw new IllegalStateException("Attempting to add User Access Controls to " + port + ", but it is not a RootGroupPort");
|
throw new IllegalStateException("Attempting to add User Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
|
||||||
}
|
}
|
||||||
((RootGroupPort) port).setUserAccessControl(userControls);
|
((RootGroupPort) port).setUserAccessControl(userControls);
|
||||||
}
|
}
|
||||||
final Set<String> groupControls = portDTO.getGroupAccessControl();
|
final Set<String> groupControls = portDTO.getGroupAccessControl();
|
||||||
if (groupControls != null && !groupControls.isEmpty()) {
|
if (groupControls != null && !groupControls.isEmpty()) {
|
||||||
if (!(port instanceof RootGroupPort)) {
|
if (!(port instanceof RootGroupPort)) {
|
||||||
throw new IllegalStateException("Attempting to add Group Access Controls to " + port + ", but it is not a RootGroupPort");
|
throw new IllegalStateException("Attempting to add Group Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
|
||||||
}
|
}
|
||||||
((RootGroupPort) port).setGroupAccessControl(groupControls);
|
((RootGroupPort) port).setGroupAccessControl(groupControls);
|
||||||
}
|
}
|
||||||
|
@ -937,14 +937,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||||
final Set<String> userControls = portDTO.getUserAccessControl();
|
final Set<String> userControls = portDTO.getUserAccessControl();
|
||||||
if (userControls != null && !userControls.isEmpty()) {
|
if (userControls != null && !userControls.isEmpty()) {
|
||||||
if (!(port instanceof RootGroupPort)) {
|
if (!(port instanceof RootGroupPort)) {
|
||||||
throw new IllegalStateException("Attempting to add User Access Controls to " + port + ", but it is not a RootGroupPort");
|
throw new IllegalStateException("Attempting to add User Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
|
||||||
}
|
}
|
||||||
((RootGroupPort) port).setUserAccessControl(userControls);
|
((RootGroupPort) port).setUserAccessControl(userControls);
|
||||||
}
|
}
|
||||||
final Set<String> groupControls = portDTO.getGroupAccessControl();
|
final Set<String> groupControls = portDTO.getGroupAccessControl();
|
||||||
if (groupControls != null && !groupControls.isEmpty()) {
|
if (groupControls != null && !groupControls.isEmpty()) {
|
||||||
if (!(port instanceof RootGroupPort)) {
|
if (!(port instanceof RootGroupPort)) {
|
||||||
throw new IllegalStateException("Attempting to add Group Access Controls to " + port + ", but it is not a RootGroupPort");
|
throw new IllegalStateException("Attempting to add Group Access Controls to " + port.getIdentifier() + ", but it is not a RootGroupPort");
|
||||||
}
|
}
|
||||||
((RootGroupPort) port).setGroupAccessControl(groupControls);
|
((RootGroupPort) port).setGroupAccessControl(groupControls);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1085,7 +1085,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete(final boolean ignoreConnections) {
|
public void verifyCanDelete(final boolean ignoreConnections) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this + " is running");
|
throw new IllegalStateException(this.getIdentifier() + " is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ignoreConnections) {
|
if (!ignoreConnections) {
|
||||||
|
@ -1099,7 +1099,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
if (connection.getSource().equals(this)) {
|
if (connection.getSource().equals(this)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException(this + " is the destination of another component");
|
throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1114,7 +1114,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
||||||
final ScheduledState currentState = getPhysicalScheduledState();
|
final ScheduledState currentState = getPhysicalScheduledState();
|
||||||
if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) {
|
if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(this + " cannot be started because it is not stopped. Current state is " + currentState.name());
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not stopped. Current state is " + currentState.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
|
@ -1128,12 +1128,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
||||||
for (final ValidationResult result : validationResults) {
|
for (final ValidationResult result : validationResults) {
|
||||||
if (!result.isValid()) {
|
if (!result.isValid()) {
|
||||||
throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!isValid()) {
|
if (!isValid()) {
|
||||||
throw new IllegalStateException(this + " is not in a valid state");
|
throw new IllegalStateException(this.getIdentifier() + " is not in a valid state");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1141,21 +1141,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStop() {
|
public void verifyCanStop() {
|
||||||
if (getScheduledState() != ScheduledState.RUNNING) {
|
if (getScheduledState() != ScheduledState.RUNNING) {
|
||||||
throw new IllegalStateException(this + " is not scheduled to run");
|
throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this + " is not stopped");
|
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable() {
|
public void verifyCanEnable() {
|
||||||
if (getScheduledState() != ScheduledState.DISABLED) {
|
if (getScheduledState() != ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(this + " is not disabled");
|
throw new IllegalStateException(this.getIdentifier() + " is not disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
|
@ -1164,7 +1164,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDisable() {
|
public void verifyCanDisable() {
|
||||||
if (getScheduledState() != ScheduledState.STOPPED) {
|
if (getScheduledState() != ScheduledState.STOPPED) {
|
||||||
throw new IllegalStateException(this + " is not stopped");
|
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
||||||
}
|
}
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
}
|
}
|
||||||
|
@ -1177,7 +1177,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
private void verifyNoActiveThreads() throws IllegalStateException {
|
private void verifyNoActiveThreads() throws IllegalStateException {
|
||||||
final int threadCount = processScheduler.getActiveThreadCount(this);
|
final int threadCount = processScheduler.getActiveThreadCount(this);
|
||||||
if (threadCount > 0) {
|
if (threadCount > 0) {
|
||||||
throw new IllegalStateException(this + " has " + threadCount + " threads still active");
|
throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -163,50 +163,50 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
|
throw new IllegalStateException("Cannot delete " + reportingTask.getIdentifier() + " because it is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDisable() {
|
public void verifyCanDisable() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running");
|
throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is currently running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isDisabled()) {
|
if (isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled");
|
throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is already disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable() {
|
public void verifyCanEnable() {
|
||||||
if (!isDisabled()) {
|
if (!isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled");
|
throw new IllegalStateException("Cannot enable " + reportingTask.getIdentifier() + " because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStart() {
|
public void verifyCanStart() {
|
||||||
if (isDisabled()) {
|
if (isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled");
|
throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is currently disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running");
|
throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is already running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStop() {
|
public void verifyCanStop() {
|
||||||
if (!isRunning()) {
|
if (!isRunning()) {
|
||||||
throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running");
|
throw new IllegalStateException("Cannot stop " + reportingTask.getIdentifier() + " because it is not running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
|
throw new IllegalStateException("Cannot update " + reportingTask.getIdentifier() + " because it is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,15 +219,15 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
||||||
switch (getScheduledState()) {
|
switch (getScheduledState()) {
|
||||||
case DISABLED:
|
case DISABLED:
|
||||||
throw new IllegalStateException(this + " cannot be started because it is disabled");
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
throw new IllegalStateException(this + " cannot be started because it is already running");
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
|
||||||
case STOPPED:
|
case STOPPED:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
final int activeThreadCount = getActiveThreadCount();
|
final int activeThreadCount = getActiveThreadCount();
|
||||||
if (activeThreadCount > 0) {
|
if (activeThreadCount > 0) {
|
||||||
throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it has " + activeThreadCount + " active threads already");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<String> ids = new HashSet<>();
|
final Set<String> ids = new HashSet<>();
|
||||||
|
@ -238,14 +238,14 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
||||||
for (final ValidationResult result : validationResults) {
|
for (final ValidationResult result : validationResults) {
|
||||||
if (!result.isValid()) {
|
if (!result.isValid()) {
|
||||||
throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
|
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
|
return "ReportingTask[id=" + getIdentifier() + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
public void doSchedule(final ReportingTaskNode taskNode, final ScheduleState scheduleState) {
|
public void doSchedule(final ReportingTaskNode taskNode, final ScheduleState scheduleState) {
|
||||||
final List<AtomicBoolean> existingTriggers = canceledTriggers.get(taskNode);
|
final List<AtomicBoolean> existingTriggers = canceledTriggers.get(taskNode);
|
||||||
if (existingTriggers != null) {
|
if (existingTriggers != null) {
|
||||||
throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask() + " because it is already scheduled to run");
|
throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run");
|
||||||
}
|
}
|
||||||
|
|
||||||
final String cronSchedule = taskNode.getSchedulingPeriod();
|
final String cronSchedule = taskNode.getSchedulingPeriod();
|
||||||
|
@ -85,7 +85,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
try {
|
try {
|
||||||
cronExpression = new CronExpression(cronSchedule);
|
cronExpression = new CronExpression(cronSchedule);
|
||||||
} catch (final Exception pe) {
|
} catch (final Exception pe) {
|
||||||
throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask() + " to run because its scheduling period is not valid");
|
throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
|
final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
|
||||||
|
|
|
@ -380,7 +380,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
@Override
|
@Override
|
||||||
public void startPort(final Port port) {
|
public void startPort(final Port port) {
|
||||||
if (!port.isValid()) {
|
if (!port.isValid()) {
|
||||||
throw new IllegalStateException("Port " + port.getName() + " is not in a valid state");
|
throw new IllegalStateException("Port " + port.getIdentifier() + " is not in a valid state");
|
||||||
}
|
}
|
||||||
|
|
||||||
port.onSchedulingStart();
|
port.onSchedulingStart();
|
||||||
|
@ -407,7 +407,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
|
|
||||||
private synchronized void startConnectable(final Connectable connectable) {
|
private synchronized void startConnectable(final Connectable connectable) {
|
||||||
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
|
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(connectable + " is disabled, so it cannot be started");
|
throw new IllegalStateException(connectable.getIdentifier() + " is disabled, so it cannot be started");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable));
|
final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable));
|
||||||
|
|
|
@ -16,22 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.controller.service;
|
package org.apache.nifi.controller.service;
|
||||||
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.authorization.Resource;
|
import org.apache.nifi.authorization.Resource;
|
||||||
|
@ -53,6 +38,22 @@ import org.apache.nifi.util.ReflectionUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
|
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
|
||||||
|
@ -214,7 +215,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
if (getState() != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be deleted because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,39 +227,39 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
|
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
|
||||||
if (!this.isActive()) {
|
if (!this.isActive()) {
|
||||||
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
|
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ControllerServiceReference references = getReferences();
|
final ControllerServiceReference references = getReferences();
|
||||||
|
|
||||||
final Set<ConfiguredComponent> activeReferences = new HashSet<>();
|
final Set<String> activeReferencesIdentifiers = new HashSet<>();
|
||||||
for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
|
for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
|
||||||
if (!ignoreReferences.contains(activeReference)) {
|
if (!ignoreReferences.contains(activeReference)) {
|
||||||
activeReferences.add(activeReference);
|
activeReferencesIdentifiers.add(activeReference.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!activeReferences.isEmpty()) {
|
if (!activeReferencesIdentifiers.isEmpty()) {
|
||||||
throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + activeReferences.size() +
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() +
|
||||||
" components that are currently running: " + activeReferences);
|
" components that are currently running: [" + StringUtils.join(activeReferencesIdentifiers, ", ") + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable() {
|
public void verifyCanEnable() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
if (getState() != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isValid()) {
|
if (!isValid()) {
|
||||||
throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + getValidationErrors());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
|
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
if (getState() != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<String> ids = new HashSet<>();
|
final Set<String> ids = new HashSet<>();
|
||||||
|
@ -269,7 +270,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
|
||||||
for (final ValidationResult result : validationResults) {
|
for (final ValidationResult result : validationResults) {
|
||||||
if (!result.isValid()) {
|
if (!result.isValid()) {
|
||||||
throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,7 +278,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
if (getState() != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
|
throw new IllegalStateException(implementation.getIdentifier() + " cannot be updated because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
|
|
||||||
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
|
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
|
||||||
final StateManagerProvider stateManagerProvider,final VariableRegistry variableRegistry) {
|
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry) {
|
||||||
|
|
||||||
this.flowController = flowController;
|
this.flowController = flowController;
|
||||||
this.processScheduler = scheduler;
|
this.processScheduler = scheduler;
|
||||||
|
@ -96,7 +96,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
this.variableRegistry = variableRegistry;
|
this.variableRegistry = variableRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Class<?>[] getInterfaces(final Class<?> cls) {
|
private Class<?>[] getInterfaces(final Class<?> cls) {
|
||||||
final List<Class<?>> allIfcs = new ArrayList<>();
|
final List<Class<?>> allIfcs = new ArrayList<>();
|
||||||
populateInterfaces(cls, allIfcs);
|
populateInterfaces(cls, allIfcs);
|
||||||
|
@ -164,7 +163,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
if (disabled && !validDisabledMethods.contains(method)) {
|
if (disabled && !validDisabledMethods.contains(method)) {
|
||||||
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
|
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
|
||||||
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
||||||
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
|
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier()
|
||||||
|
+ " because the Controller Service is disabled");
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled");
|
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled");
|
||||||
}
|
}
|
||||||
|
@ -256,13 +256,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
};
|
};
|
||||||
|
|
||||||
final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
|
final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
|
||||||
new Class[] {ControllerService.class}, invocationHandler);
|
new Class[]{ControllerService.class}, invocationHandler);
|
||||||
|
|
||||||
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
|
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
|
||||||
final String componentType = "(Missing) " + simpleClassName;
|
final String componentType = "(Missing) " + simpleClassName;
|
||||||
|
|
||||||
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id,
|
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id,
|
||||||
new StandardValidationContextFactory(this,variableRegistry), this, componentType, type, variableRegistry);
|
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry);
|
||||||
return serviceNode;
|
return serviceNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,7 +539,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
return getRootGroup().findControllerService(serviceIdentifier);
|
return getRootGroup().findControllerService(serviceIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
|
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
|
||||||
final Set<ControllerServiceNode> serviceNodes;
|
final Set<ControllerServiceNode> serviceNodes;
|
||||||
|
@ -595,8 +594,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a List of all components that reference the given referencedNode (either directly or indirectly through another service) that are also of the given componentType. The list that is
|
* Returns a List of all components that reference the given referencedNode
|
||||||
* returned is in the order in which they will need to be 'activated' (enabled/started).
|
* (either directly or indirectly through another service) that are also of
|
||||||
|
* the given componentType. The list that is returned is in the order in
|
||||||
|
* which they will need to be 'activated' (enabled/started).
|
||||||
*
|
*
|
||||||
* @param referencedNode node
|
* @param referencedNode node
|
||||||
* @param componentType type
|
* @param componentType type
|
||||||
|
@ -709,7 +710,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
// we can always stop referencing components
|
// we can always stop referencing components
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
|
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
|
||||||
throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier");
|
throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier");
|
||||||
|
|
|
@ -302,7 +302,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
node.getProcessGroup().startProcessor(node);
|
node.getProcessGroup().startProcessor(node);
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
LOG.error("Unable to start {} due to {}", new Object[]{node, t});
|
LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -326,7 +326,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
node.getProcessGroup().stopProcessor(node);
|
node.getProcessGroup().stopProcessor(node);
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
LOG.error("Unable to stop {} due to {}", new Object[]{node, t});
|
LOG.error("Unable to stop processor {} due to {}", new Object[]{node.getIdentifier(), t});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -349,7 +349,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
private void shutdown(final ProcessGroup procGroup) {
|
private void shutdown(final ProcessGroup procGroup) {
|
||||||
for (final ProcessorNode node : procGroup.getProcessors()) {
|
for (final ProcessorNode node : procGroup.getProcessors()) {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()),variableRegistry);
|
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry);
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -386,12 +386,9 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (inputPorts.containsKey(requireNonNull(port).getIdentifier())) {
|
if (inputPorts.containsKey(requireNonNull(port).getIdentifier())
|
||||||
throw new IllegalStateException("Input Port with ID " + port.getIdentifier() + " already exists");
|
|| getInputPortByName(port.getName()) != null) {
|
||||||
}
|
throw new IllegalStateException("The input port name or identifier is not available to be added.");
|
||||||
|
|
||||||
if (getInputPortByName(port.getName()) != null) {
|
|
||||||
throw new IllegalStateException("Input Port with name " + port.getName() + " already exists");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
port.setProcessGroup(this);
|
port.setProcessGroup(this);
|
||||||
|
@ -407,7 +404,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier());
|
final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier());
|
||||||
if (toRemove == null) {
|
if (toRemove == null) {
|
||||||
throw new IllegalStateException(port + " is not an Input Port of this Process Group");
|
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
port.verifyCanDelete();
|
port.verifyCanDelete();
|
||||||
|
@ -427,7 +424,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
final Port removed = inputPorts.remove(port.getIdentifier());
|
final Port removed = inputPorts.remove(port.getIdentifier());
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
throw new IllegalStateException(port + " is not an Input Port of this Process Group");
|
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Input Port {} removed from flow", port);
|
LOG.info("Input Port {} removed from flow", port);
|
||||||
|
@ -460,20 +457,17 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
public void addOutputPort(final Port port) {
|
public void addOutputPort(final Port port) {
|
||||||
if (isRootGroup()) {
|
if (isRootGroup()) {
|
||||||
if (!(port instanceof RootGroupPort)) {
|
if (!(port instanceof RootGroupPort)) {
|
||||||
throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to the Root Group");
|
throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to the Root Group");
|
||||||
}
|
}
|
||||||
} else if (!(port instanceof LocalPort)) {
|
} else if (!(port instanceof LocalPort)) {
|
||||||
throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to a non-root group");
|
throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to a non-root group");
|
||||||
}
|
}
|
||||||
|
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (outputPorts.containsKey(requireNonNull(port).getIdentifier())) {
|
if (outputPorts.containsKey(requireNonNull(port).getIdentifier())
|
||||||
throw new IllegalStateException("Output Port with ID " + port.getIdentifier() + " already exists");
|
|| getOutputPortByName(port.getName()) != null) {
|
||||||
}
|
throw new IllegalStateException("Output Port with given identifier or name is not available");
|
||||||
|
|
||||||
if (getOutputPortByName(port.getName()) != null) {
|
|
||||||
throw new IllegalStateException("Output Port with Name " + port.getName() + " already exists");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
port.setProcessGroup(this);
|
port.setProcessGroup(this);
|
||||||
|
@ -495,12 +489,12 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!toRemove.getConnections().isEmpty()) {
|
if (!toRemove.getConnections().isEmpty()) {
|
||||||
throw new IllegalStateException(port + " cannot be removed until its connections are removed");
|
throw new IllegalStateException(port.getIdentifier() + " cannot be removed until its connections are removed");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Port removed = outputPorts.remove(port.getIdentifier());
|
final Port removed = outputPorts.remove(port.getIdentifier());
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
throw new IllegalStateException(port + " is not an Output Port of this Process Group");
|
throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Output Port {} removed from flow", port);
|
LOG.info("Output Port {} removed from flow", port);
|
||||||
|
@ -572,7 +566,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final ProcessGroup toRemove = processGroups.get(group.getIdentifier());
|
final ProcessGroup toRemove = processGroups.get(group.getIdentifier());
|
||||||
if (toRemove == null) {
|
if (toRemove == null) {
|
||||||
throw new IllegalStateException(group + " is not a member of this Process Group");
|
throw new IllegalStateException(group.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
toRemove.verifyCanDelete();
|
toRemove.verifyCanDelete();
|
||||||
|
|
||||||
|
@ -651,7 +645,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId);
|
final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId);
|
||||||
if (remoteGroup == null) {
|
if (remoteGroup == null) {
|
||||||
throw new IllegalStateException(remoteProcessGroup + " is not a member of this Process Group");
|
throw new IllegalStateException(remoteProcessGroup.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteGroup.verifyCanDelete();
|
remoteGroup.verifyCanDelete();
|
||||||
|
@ -705,7 +699,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (!processors.containsKey(id)) {
|
if (!processors.containsKey(id)) {
|
||||||
throw new IllegalStateException(processor + " is not a member of this Process Group");
|
throw new IllegalStateException(processor.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
processor.verifyCanDelete();
|
processor.verifyCanDelete();
|
||||||
|
@ -714,10 +708,10 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()),variableRegistry);
|
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry);
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
|
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
|
||||||
|
@ -836,11 +830,9 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
|
if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
|
||||||
throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group");
|
throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group");
|
||||||
}
|
}
|
||||||
} else {
|
} else if (destinationGroup != this) {
|
||||||
if (destinationGroup != this) {
|
|
||||||
throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group");
|
throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else { // source is not a port
|
} else { // source is not a port
|
||||||
if (sourceGroup != this) {
|
if (sourceGroup != this) {
|
||||||
throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group");
|
throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group");
|
||||||
|
@ -856,7 +848,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
+ "Port but the Input Port does not belong to a child Process Group");
|
+ "Port but the Input Port does not belong to a child Process Group");
|
||||||
}
|
}
|
||||||
} else if (destinationGroup != this) {
|
} else if (destinationGroup != this) {
|
||||||
throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination
|
throw new IllegalStateException("Cannot add Connection between " + source.getIdentifier() + " and " + destination.getIdentifier()
|
||||||
+ " because they are in different Process Groups and neither is an Input Port or Output Port");
|
+ " because they are in different Process Groups and neither is an Input Port or Output Port");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -909,7 +901,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
// verify that Connection belongs to this group
|
// verify that Connection belongs to this group
|
||||||
final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier());
|
final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier());
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
throw new IllegalStateException(connectionToRemove + " is not a member of this Process Group");
|
throw new IllegalStateException("Connection " + connectionToRemove.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
connectionToRemove.verifyCanDelete();
|
connectionToRemove.verifyCanDelete();
|
||||||
|
@ -1083,12 +1075,12 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (getInputPort(port.getIdentifier()) == null) {
|
if (getInputPort(port.getIdentifier()) == null) {
|
||||||
throw new IllegalStateException(port + " is not a member of this Process Group");
|
throw new IllegalStateException("Port " + port.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ScheduledState state = port.getScheduledState();
|
final ScheduledState state = port.getScheduledState();
|
||||||
if (state == ScheduledState.DISABLED) {
|
if (state == ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException("InputPort " + port + " is disabled");
|
throw new IllegalStateException("InputPort " + port.getIdentifier() + " is disabled");
|
||||||
} else if (state == ScheduledState.RUNNING) {
|
} else if (state == ScheduledState.RUNNING) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1364,7 +1356,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", name).toString();
|
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("identifier", getIdentifier()).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1752,7 +1744,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
|
final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
|
||||||
if (existing == null) {
|
if (existing == null) {
|
||||||
throw new IllegalStateException(funnel + " is not a member of this ProcessGroup");
|
throw new IllegalStateException("Funnel " + funnel.getIdentifier() + " is not a member of this ProcessGroup");
|
||||||
}
|
}
|
||||||
|
|
||||||
funnel.verifyCanDelete();
|
funnel.verifyCanDelete();
|
||||||
|
@ -1785,7 +1777,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addControllerService(final ControllerServiceNode service) {
|
public void addControllerService(final ControllerServiceNode service) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -1837,7 +1828,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier());
|
final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier());
|
||||||
if (existing == null) {
|
if (existing == null) {
|
||||||
throw new IllegalStateException(service + " is not a member of this Process Group");
|
throw new IllegalStateException("ControllerService " + service.getIdentifier() + " is not a member of this Process Group");
|
||||||
}
|
}
|
||||||
|
|
||||||
service.verifyCanDelete();
|
service.verifyCanDelete();
|
||||||
|
@ -1952,7 +1943,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
final Template existing = templates.get(requireNonNull(template).getIdentifier());
|
final Template existing = templates.get(requireNonNull(template).getIdentifier());
|
||||||
if (existing == null) {
|
if (existing == null) {
|
||||||
throw new IllegalStateException(template + " is not a member of this ProcessGroup");
|
throw new IllegalStateException("Template " + template.getIdentifier() + " is not a member of this ProcessGroup");
|
||||||
}
|
}
|
||||||
|
|
||||||
templates.remove(template.getIdentifier());
|
templates.remove(template.getIdentifier());
|
||||||
|
@ -1975,7 +1966,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
for (final Connectable connectable : connectables) {
|
for (final Connectable connectable : connectables) {
|
||||||
for (final Connection conn : connectable.getConnections()) {
|
for (final Connection conn : connectable.getConnections()) {
|
||||||
if (!connections.containsKey(conn.getIdentifier())) {
|
if (!connections.containsKey(conn.getIdentifier())) {
|
||||||
throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections from the parent Process Group");
|
throw new IllegalStateException("Connectable component " + connectable.getIdentifier()
|
||||||
|
+ " cannot be removed because it has incoming connections from the parent Process Group");
|
||||||
}
|
}
|
||||||
connectionIdsToRemove.add(conn.getIdentifier());
|
connectionIdsToRemove.add(conn.getIdentifier());
|
||||||
}
|
}
|
||||||
|
@ -1990,11 +1982,11 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
for (final String procId : snippet.getProcessors().keySet()) {
|
for (final String procId : snippet.getProcessors().keySet()) {
|
||||||
final ProcessorNode procNode = getProcessor(procId);
|
final ProcessorNode procNode = getProcessor(procId);
|
||||||
if (procNode.isRunning()) {
|
if (procNode.isRunning()) {
|
||||||
throw new IllegalStateException(procNode + " cannot be removed because it is running");
|
throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it is running");
|
||||||
}
|
}
|
||||||
final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
|
final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
|
||||||
if (activeThreadCount != 0) {
|
if (activeThreadCount != 0) {
|
||||||
throw new IllegalStateException(procNode + " cannot be removed because it still has " + activeThreadCount + " active threads");
|
throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it still has " + activeThreadCount + " active threads");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2003,7 +1995,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
for (final Connectable connectable : connectables) {
|
for (final Connectable connectable : connectables) {
|
||||||
for (final Connection conn : connectable.getIncomingConnections()) {
|
for (final Connection conn : connectable.getIncomingConnections()) {
|
||||||
if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
|
if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
|
||||||
throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections "
|
throw new IllegalStateException("Connectable component " + connectable.getIdentifier() + " cannot be removed because it has incoming connections "
|
||||||
+ "that are not selected to be deleted");
|
+ "that are not selected to be deleted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2048,7 +2040,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
return (map == null) ? Collections.emptySet() : map.keySet();
|
return (map == null) ? Collections.emptySet() : map.keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void move(final Snippet snippet, final ProcessGroup destination) {
|
public void move(final Snippet snippet, final ProcessGroup destination) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -2185,11 +2176,14 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies that all ID's defined within the given snippet reference components within this ProcessGroup. If this is not the case, throws {@link IllegalStateException}.
|
* Verifies that all ID's defined within the given snippet reference
|
||||||
|
* components within this ProcessGroup. If this is not the case, throws
|
||||||
|
* {@link IllegalStateException}.
|
||||||
*
|
*
|
||||||
* @param snippet the snippet
|
* @param snippet the snippet
|
||||||
* @throws NullPointerException if the argument is null
|
* @throws NullPointerException if the argument is null
|
||||||
* @throws IllegalStateException if the snippet contains an ID that references a component that is not part of this ProcessGroup
|
* @throws IllegalStateException if the snippet contains an ID that
|
||||||
|
* references a component that is not part of this ProcessGroup
|
||||||
*/
|
*/
|
||||||
private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
|
private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
|
||||||
requireNonNull(snippet);
|
requireNonNull(snippet);
|
||||||
|
@ -2206,8 +2200,10 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Verifies that all ID's specified by the given set exist as keys in the given Map. If any of the ID's does not exist as a key in the map, will throw {@link IllegalStateException} indicating the
|
* Verifies that all ID's specified by the given set exist as keys in the
|
||||||
* ID that is invalid and specifying the Component Type.
|
* given Map. If any of the ID's does not exist as a key in the map, will
|
||||||
|
* throw {@link IllegalStateException} indicating the ID that is invalid and
|
||||||
|
* specifying the Component Type.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -2286,8 +2282,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
if (connection.getSource().equals(port)) {
|
if (connection.getSource().equals(port)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Cannot delete Process Group because Input Port " + port +
|
throw new IllegalStateException("Cannot delete Process Group because Input Port " + port.getIdentifier()
|
||||||
" has at least one incoming connection from a component outside of the Process Group. Delete this connection first.");
|
+ " has at least one incoming connection from a component outside of the Process Group. Delete this connection first.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2297,8 +2293,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
if (connection.getDestination().equals(port)) {
|
if (connection.getDestination().equals(port)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Cannot delete Process Group because Output Port " + port +
|
throw new IllegalStateException("Cannot delete Process Group because Output Port " + port.getIdentifier()
|
||||||
" has at least one outgoing connection to a component outside of the Process Group. Delete this connection first.");
|
+ " has at least one outgoing connection to a component outside of the Process Group. Delete this connection first.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2322,7 +2318,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||||
try {
|
try {
|
||||||
if (connectable.getScheduledState() == ScheduledState.STOPPED) {
|
if (connectable.getScheduledState() == ScheduledState.STOPPED) {
|
||||||
if (scheduler.getActiveThreadCount(connectable) > 0) {
|
if (scheduler.getActiveThreadCount(connectable) > 0) {
|
||||||
throw new IllegalStateException("Cannot start " + connectable + " because it is currently stopping");
|
throw new IllegalStateException("Cannot start component with id" + connectable.getIdentifier() + " because it is currently stopping");
|
||||||
}
|
}
|
||||||
|
|
||||||
connectable.verifyCanStart();
|
connectable.verifyCanStart();
|
||||||
|
|
|
@ -94,6 +94,6 @@ public class GhostProcessor implements Processor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "GhostProcessor[id=" + id + ", class=" + canonicalClassName + "]";
|
return "GhostProcessor[id=" + id + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,11 +53,11 @@ public class StandardSchedulingContext implements SchedulingContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (serviceNode.getState() != ControllerServiceState.ENABLED) {
|
if (serviceNode.getState() != ControllerServiceState.ENABLED) {
|
||||||
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently enabled");
|
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!serviceNode.isValid()) {
|
if (!serviceNode.isValid()) {
|
||||||
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
|
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently valid");
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceNode.addReference(processorNode);
|
serviceNode.addReference(processorNode);
|
||||||
|
|
|
@ -526,7 +526,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (requireNonNull(port).getTargetExists()) {
|
if (requireNonNull(port).getTargetExists()) {
|
||||||
throw new IllegalStateException("Cannot remove Remote Port " + port + " because it still exists on the Remote Instance");
|
throw new IllegalStateException("Cannot remove Remote Port " + port.getIdentifier() + " because it still exists on the Remote Instance");
|
||||||
}
|
}
|
||||||
if (!port.getConnections().isEmpty() || port.hasIncomingConnection()) {
|
if (!port.getConnections().isEmpty() || port.hasIncomingConnection()) {
|
||||||
throw new IllegalStateException("Cannot remove Remote Port because it is connected to other components");
|
throw new IllegalStateException("Cannot remove Remote Port because it is connected to other components");
|
||||||
|
@ -1229,15 +1229,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (isTransmitting()) {
|
if (isTransmitting()) {
|
||||||
throw new IllegalStateException(this + " is transmitting");
|
throw new IllegalStateException(this.getIdentifier() + " is transmitting");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Port port : inputPorts.values()) {
|
for (final Port port : inputPorts.values()) {
|
||||||
if (!ignoreConnections && port.hasIncomingConnection()) {
|
if (!ignoreConnections && port.hasIncomingConnection()) {
|
||||||
throw new IllegalStateException(this + " is the destination of another component");
|
throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
|
||||||
}
|
}
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1249,7 +1249,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1262,26 +1262,26 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (isTransmitting()) {
|
if (isTransmitting()) {
|
||||||
throw new IllegalStateException(this + " is already transmitting");
|
throw new IllegalStateException(this.getIdentifier() + " is already transmitting");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final StandardRemoteGroupPort port : inputPorts.values()) {
|
for (final StandardRemoteGroupPort port : inputPorts.values()) {
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port.hasIncomingConnection() && !port.getTargetExists()) {
|
if (port.hasIncomingConnection() && !port.getTargetExists()) {
|
||||||
throw new IllegalStateException(this + " has a Connection to Port " + port + ", but that Port no longer exists on the remote system");
|
throw new IllegalStateException(this.getIdentifier() + " has a Connection to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final StandardRemoteGroupPort port : outputPorts.values()) {
|
for (final StandardRemoteGroupPort port : outputPorts.values()) {
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!port.getConnections().isEmpty() && !port.getTargetExists()) {
|
if (!port.getConnections().isEmpty() && !port.getTargetExists()) {
|
||||||
throw new IllegalStateException(this + " has a Connection to Port " + port + ", but that Port no longer exists on the remote system");
|
throw new IllegalStateException(this.getIdentifier() + " has a Connection to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1292,7 +1292,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStopTransmitting() {
|
public void verifyCanStopTransmitting() {
|
||||||
if (!isTransmitting()) {
|
if (!isTransmitting()) {
|
||||||
throw new IllegalStateException(this + " is not transmitting");
|
throw new IllegalStateException(this.getIdentifier() + " is not transmitting");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1301,18 +1301,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (isTransmitting()) {
|
if (isTransmitting()) {
|
||||||
throw new IllegalStateException(this + " is currently transmitting");
|
throw new IllegalStateException(this.getIdentifier() + " is currently transmitting");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Port port : inputPorts.values()) {
|
for (final Port port : inputPorts.values()) {
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Port port : outputPorts.values()) {
|
for (final Port port : outputPorts.values()) {
|
||||||
if (port.isRunning()) {
|
if (port.isRunning()) {
|
||||||
throw new IllegalStateException(this + " has running Port: " + port);
|
throw new IllegalStateException(this.getIdentifier() + " has running Port: " + port.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class GhostReportingTask implements ReportingTask {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "GhostReportingTask[id=" + id + ", class=" + canonicalClassName + "]";
|
return "GhostReportingTask[id=" + id + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue