mirror of https://github.com/apache/nifi.git
NIFI-10431: Improved log messages and Exception messages to indicate which components they pertain to. Also added a toString() for StandardLabel to make it more clear which Label is being referenced
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #6361.
This commit is contained in:
parent
c19d6a0db1
commit
7a90137cef
|
@ -407,7 +407,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setLossTolerant(final boolean lossTolerant) {
|
public synchronized void setLossTolerant(final boolean lossTolerant) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
this.lossTolerant.set(lossTolerant);
|
this.lossTolerant.set(lossTolerant);
|
||||||
}
|
}
|
||||||
|
@ -421,7 +421,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setAutoTerminatedRelationships(final Set<Relationship> terminate) {
|
public void setAutoTerminatedRelationships(final Set<Relationship> terminate) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
|
undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
|
||||||
|
@ -516,7 +516,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setSchedulingPeriod(final String schedulingPeriod) {
|
public synchronized void setSchedulingPeriod(final String schedulingPeriod) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
//Before setting the new Configuration references, we need to remove the current ones from reference counts.
|
//Before setting the new Configuration references, we need to remove the current ones from reference counts.
|
||||||
|
@ -554,7 +554,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setRunDuration(final long duration, final TimeUnit timeUnit) {
|
public synchronized void setRunDuration(final long duration, final TimeUnit timeUnit) {
|
||||||
if (duration < 0) {
|
if (duration < 0) {
|
||||||
throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to "
|
throw new IllegalArgumentException("Run Duration of " + this + " cannot be set to a negative value; cannot set to "
|
||||||
+ timeUnit.toSeconds(duration) + " seconds");
|
+ timeUnit.toSeconds(duration) + " seconds");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,11 +575,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setYieldPeriod(final String yieldPeriod) {
|
public synchronized void setYieldPeriod(final String yieldPeriod) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
final long yieldNanos = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.NANOSECONDS);
|
final long yieldNanos = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.NANOSECONDS);
|
||||||
if (yieldNanos < 0) {
|
if (yieldNanos < 0) {
|
||||||
throw new IllegalArgumentException("Yield duration must be positive");
|
throw new IllegalArgumentException("Yield duration of " + this + " cannot be set to a negative value: " + yieldNanos + " nanos");
|
||||||
}
|
}
|
||||||
this.yieldPeriod.set(yieldPeriod);
|
this.yieldPeriod.set(yieldPeriod);
|
||||||
this.yieldNanos = yieldNanos;
|
this.yieldNanos = yieldNanos;
|
||||||
|
@ -631,12 +631,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setPenalizationPeriod(final String penalizationPeriod) {
|
public synchronized void setPenalizationPeriod(final String penalizationPeriod) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS);
|
final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS);
|
||||||
if (penalizationMillis < 0) {
|
if (penalizationMillis < 0) {
|
||||||
throw new IllegalArgumentException("Penalization duration must be positive");
|
throw new IllegalArgumentException("Penalization duration of " + this + " cannot be set to a negative value: " + penalizationMillis + " millis");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.penalizationPeriod.set(penalizationPeriod);
|
this.penalizationPeriod.set(penalizationPeriod);
|
||||||
|
@ -654,12 +654,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setMaxConcurrentTasks(final int taskCount) {
|
public synchronized void setMaxConcurrentTasks(final int taskCount) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
|
if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
|
||||||
throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component "
|
throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component "
|
||||||
+ getIdentifier() + " because Scheduling Strategy is not Event Driven");
|
+ this + " because Scheduling Strategy is not Event Driven");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isTriggeredSerially()) {
|
if (!isTriggeredSerially()) {
|
||||||
|
@ -717,8 +717,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
Objects.requireNonNull(connection, "connection cannot be null");
|
Objects.requireNonNull(connection, "connection cannot be null");
|
||||||
|
|
||||||
if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) {
|
if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException("Cannot add a connection to " + this + " because the ProcessorNode is neither the Source nor the Destination");
|
||||||
"Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -803,7 +802,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
// then it is not legal to remove this relationship from
|
// then it is not legal to remove this relationship from
|
||||||
// this connection.
|
// this connection.
|
||||||
throw new IllegalStateException("Cannot remove relationship " + rel.getName()
|
throw new IllegalStateException("Cannot remove relationship " + rel.getName()
|
||||||
+ " from Connection because doing so would invalidate Processor " + this
|
+ " from Connection " + connection + " because doing so would invalidate " + this
|
||||||
+ ", which is currently running");
|
+ ", which is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -858,8 +857,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
for (final Relationship relationship : connection.getRelationships()) {
|
for (final Relationship relationship : connection.getRelationships()) {
|
||||||
final Set<Connection> connectionsForRelationship = getConnections(relationship);
|
final Set<Connection> connectionsForRelationship = getConnections(relationship);
|
||||||
if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) {
|
if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(connection + " cannot be removed because its source is running and removing it will invalidate " + this);
|
||||||
"This connection cannot be removed because its source is running and removing it will invalidate this processor");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -881,8 +879,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!connectionRemoved) {
|
if (!connectionRemoved) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException("Cannot remove " + connection + " from " + this + " because the ProcessorNode is not the Source");
|
||||||
"Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Resetting Validation State of {} due to connection removed", this);
|
LOG.debug("Resetting Validation State of {} due to connection removed", this);
|
||||||
|
@ -929,7 +926,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setProcessor(final LoggableComponent<Processor> processor) {
|
public synchronized void setProcessor(final LoggableComponent<Processor> processor) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ProcessorDetails processorDetails = new ProcessorDetails(processor);
|
final ProcessorDetails processorDetails = new ProcessorDetails(processor);
|
||||||
|
@ -1368,7 +1365,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setAnnotationData(final String data) {
|
public void setAnnotationData(final String data) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot set AnnotationData while processor is running");
|
throw new IllegalStateException("Cannot set AnnotationData on " + this + " while processor is running");
|
||||||
}
|
}
|
||||||
super.setAnnotationData(data);
|
super.setAnnotationData(data);
|
||||||
}
|
}
|
||||||
|
@ -1381,7 +1378,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete(final boolean ignoreConnections) {
|
public void verifyCanDelete(final boolean ignoreConnections) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is running");
|
throw new IllegalStateException("Cannot delete " + this + " because Processor is running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ignoreConnections) {
|
if (!ignoreConnections) {
|
||||||
|
@ -1395,7 +1392,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
if (connection.getSource().equals(this)) {
|
if (connection.getSource().equals(this)) {
|
||||||
connection.verifyCanDelete();
|
connection.verifyCanDelete();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
|
throw new IllegalStateException("Cannot delete " + this + " because it is the destination of another component");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1410,7 +1407,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
||||||
final ScheduledState currentState = getPhysicalScheduledState();
|
final ScheduledState currentState = getPhysicalScheduledState();
|
||||||
if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) {
|
if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not stopped. Current state is " + currentState.name());
|
throw new IllegalStateException(this + " cannot be started because it is not stopped. Current state is " + currentState.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
|
@ -1419,33 +1416,33 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
case VALID:
|
case VALID:
|
||||||
return;
|
return;
|
||||||
case VALIDATING:
|
case VALIDATING:
|
||||||
throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because its validation is still being performed");
|
throw new IllegalStateException(this + " cannot be started because its validation is still being performed");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
|
final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
|
||||||
if (ignoredReferences != null && !validationErrors.isEmpty()) {
|
if (ignoredReferences != null && !validationErrors.isEmpty()) {
|
||||||
throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because it is not currently valid");
|
throw new IllegalStateException(this + " cannot be started because it is not currently valid");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStop() {
|
public void verifyCanStop() {
|
||||||
if (getScheduledState() != ScheduledState.RUNNING) {
|
if (getScheduledState() != ScheduledState.RUNNING) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
|
throw new IllegalStateException(this + " cannot be stopped because is not scheduled to run");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
throw new IllegalStateException(this + " cannot be updated because it is not stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable() {
|
public void verifyCanEnable() {
|
||||||
if (getScheduledState() != ScheduledState.DISABLED) {
|
if (getScheduledState() != ScheduledState.DISABLED) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is not disabled");
|
throw new IllegalStateException(this + " cannot be enabled because is not disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
|
@ -1454,7 +1451,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDisable() {
|
public void verifyCanDisable() {
|
||||||
if (getScheduledState() != ScheduledState.STOPPED) {
|
if (getScheduledState() != ScheduledState.STOPPED) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
|
throw new IllegalStateException(this + " cannot be disabled because is not stopped");
|
||||||
}
|
}
|
||||||
verifyNoActiveThreads();
|
verifyNoActiveThreads();
|
||||||
}
|
}
|
||||||
|
@ -1468,7 +1465,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
if (hasActiveThreads) {
|
if (hasActiveThreads) {
|
||||||
final int threadCount = getActiveThreadCount();
|
final int threadCount = getActiveThreadCount();
|
||||||
if (threadCount > 0) {
|
if (threadCount > 0) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
|
throw new IllegalStateException(this + " has " + threadCount + " threads still active");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1476,7 +1473,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyModifiable() throws IllegalStateException {
|
public void verifyModifiable() throws IllegalStateException {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1684,7 +1681,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanTerminate() {
|
public void verifyCanTerminate() {
|
||||||
if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) {
|
if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) {
|
||||||
throw new IllegalStateException("Processor is not stopped");
|
throw new IllegalStateException("Cannot terminate " + this + " because Processor is not stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1955,7 +1952,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setRetryCount(Integer retryCount) {
|
public void setRetryCount(Integer retryCount) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
this.retryCount = (retryCount == null) ? 0 : retryCount;
|
this.retryCount = (retryCount == null) ? 0 : retryCount;
|
||||||
}
|
}
|
||||||
|
@ -1968,7 +1965,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setRetriedRelationships(Set<String> retriedRelationships) {
|
public void setRetriedRelationships(Set<String> retriedRelationships) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships);
|
this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships);
|
||||||
}
|
}
|
||||||
|
@ -1990,7 +1987,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
|
public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;
|
this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;
|
||||||
}
|
}
|
||||||
|
@ -2003,14 +2000,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
@Override
|
@Override
|
||||||
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
|
public void setMaxBackoffPeriod(String maxBackoffPeriod) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
|
||||||
}
|
}
|
||||||
if (maxBackoffPeriod == null) {
|
if (maxBackoffPeriod == null) {
|
||||||
maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD;
|
maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD;
|
||||||
}
|
}
|
||||||
final long backoffNanos = FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS);
|
final long backoffNanos = FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS);
|
||||||
if (backoffNanos < 0) {
|
if (backoffNanos < 0) {
|
||||||
throw new IllegalArgumentException("Max Backoff Period must be positive");
|
throw new IllegalArgumentException("Cannot set Max Backoff Period of " + this + " to negative value: " + backoffNanos + " nanos");
|
||||||
}
|
}
|
||||||
this.maxBackoffPeriod = maxBackoffPeriod;
|
this.maxBackoffPeriod = maxBackoffPeriod;
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class StandardLabel implements Label {
|
||||||
private final AtomicLong zIndex = new AtomicLong(DEFAULT_Z_INDEX);
|
private final AtomicLong zIndex = new AtomicLong(DEFAULT_Z_INDEX);
|
||||||
|
|
||||||
public StandardLabel(final String identifier, final String value) {
|
public StandardLabel(final String identifier, final String value) {
|
||||||
this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null);
|
this(identifier, new Position(0D, 0D), new HashMap<>(), value, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public StandardLabel(final String identifier, final Position position, final Map<String, String> style, final String value, final ProcessGroup processGroup) {
|
public StandardLabel(final String identifier, final Position position, final Map<String, String> style, final String value, final ProcessGroup processGroup) {
|
||||||
|
@ -172,4 +172,19 @@ public class StandardLabel implements Label {
|
||||||
public void setZIndex(final long zIndex) {
|
public void setZIndex(final long zIndex) {
|
||||||
this.zIndex.set(zIndex);
|
this.zIndex.set(zIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "StandardLabel[id=" + identifier + ", text=" + ellipsis(getValue(), 50) + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String ellipsis(final String value, final int maxLength) {
|
||||||
|
if (value == null) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
if (value.length() <= maxLength) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
return value.substring(0, maxLength) + "...";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,7 +165,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
@Override
|
@Override
|
||||||
public void setReportingTask(final LoggableComponent<ReportingTask> reportingTask) {
|
public void setReportingTask(final LoggableComponent<ReportingTask> reportingTask) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Reporting Task configuration while Reporting Task is running");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while Reporting Task is running");
|
||||||
}
|
}
|
||||||
this.reportingTaskRef.set(new ReportingTaskDetails(reportingTask));
|
this.reportingTaskRef.set(new ReportingTaskDetails(reportingTask));
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
@Override
|
@Override
|
||||||
public void verifyModifiable() throws IllegalStateException {
|
public void verifyModifiable() throws IllegalStateException {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running");
|
throw new IllegalStateException("Cannot modify " + this + " while the Reporting Task is running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,37 +231,37 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot delete " + getReportingTask().getIdentifier() + " because it is currently running");
|
throw new IllegalStateException("Cannot delete " + this + " because it is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDisable() {
|
public void verifyCanDisable() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is currently running");
|
throw new IllegalStateException("Cannot disable " + this + " because it is currently running");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isDisabled()) {
|
if (isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is already disabled");
|
throw new IllegalStateException("Cannot disable " + this + " because it is already disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanEnable() {
|
public void verifyCanEnable() {
|
||||||
if (!isDisabled()) {
|
if (!isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot enable " + getReportingTask().getIdentifier() + " because it is not disabled");
|
throw new IllegalStateException("Cannot enable " + this + " because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStart() {
|
public void verifyCanStart() {
|
||||||
if (isDisabled()) {
|
if (isDisabled()) {
|
||||||
throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is currently disabled");
|
throw new IllegalStateException("Cannot start " + this + " because it is currently disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ValidationState validationState = getValidationState();
|
final ValidationState validationState = getValidationState();
|
||||||
if (validationState.getStatus() == ValidationStatus.INVALID) {
|
if (validationState.getStatus() == ValidationStatus.INVALID) {
|
||||||
throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() +
|
throw new IllegalStateException("Cannot start " + this +
|
||||||
" because it is invalid with the following validation errors: " + validationState.getValidationErrors());
|
" because it is invalid with the following validation errors: " + validationState.getValidationErrors());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -269,14 +269,14 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanStop() {
|
public void verifyCanStop() {
|
||||||
if (!isRunning()) {
|
if (!isRunning()) {
|
||||||
throw new IllegalStateException("Cannot stop " + getReportingTask().getIdentifier() + " because it is not running");
|
throw new IllegalStateException("Cannot stop " + this + " because it is not running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot update " + getReportingTask().getIdentifier() + " because it is currently running");
|
throw new IllegalStateException("Cannot update " + this + " because it is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,26 +289,26 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
|
||||||
switch (getScheduledState()) {
|
switch (getScheduledState()) {
|
||||||
case DISABLED:
|
case DISABLED:
|
||||||
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
|
throw new IllegalStateException(this + " cannot be started because it is disabled");
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
|
throw new IllegalStateException(this + " cannot be started because it is already running");
|
||||||
case STOPPED:
|
case STOPPED:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
final int activeThreadCount = getActiveThreadCount();
|
final int activeThreadCount = getActiveThreadCount();
|
||||||
if (activeThreadCount > 0) {
|
if (activeThreadCount > 0) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it has " + activeThreadCount + " active threads already");
|
throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Collection<ValidationResult> validationResults = getValidationErrors(ignoredReferences);
|
final Collection<ValidationResult> validationResults = getValidationErrors(ignoredReferences);
|
||||||
if (!validationResults.isEmpty()) {
|
if (!validationResults.isEmpty()) {
|
||||||
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not currently valid");
|
throw new IllegalStateException(this + " cannot be started because it is not currently valid");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ReportingTask[id=" + getIdentifier() + "]";
|
return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -324,7 +324,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanPerformVerification() {
|
public void verifyCanPerformVerification() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException("Cannot perform verification because Reporting Task is not fully stopped");
|
throw new IllegalStateException("Cannot perform verification of " + this + " because Reporting Task is not fully stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
final ControllerServiceInvocationHandler invocationHandler) {
|
final ControllerServiceInvocationHandler invocationHandler) {
|
||||||
synchronized (this.active) {
|
synchronized (this.active) {
|
||||||
if (isActive()) {
|
if (isActive()) {
|
||||||
throw new IllegalStateException("Cannot modify Controller Service configuration while service is active");
|
throw new IllegalStateException("Cannot modify configuration of " + this + " while service is active");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ControllerServiceDetails controllerServiceDetails = new ControllerServiceDetails(implementation, proxiedControllerService, invocationHandler);
|
final ControllerServiceDetails controllerServiceDetails = new ControllerServiceDetails(implementation, proxiedControllerService, invocationHandler);
|
||||||
|
@ -308,11 +308,11 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
|
|
||||||
if (state == ControllerServiceState.DISABLING) {
|
if (state == ControllerServiceState.DISABLING) {
|
||||||
// Provide precise/accurate error message for DISABLING case
|
// Provide precise/accurate error message for DISABLING case
|
||||||
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently still disabling. " +
|
throw new IllegalStateException("Cannot modify configuration of " + this + " because it is currently still disabling. " +
|
||||||
"Please wait for the service to fully disable before attempting to modify it.");
|
"Please wait for the service to fully disable before attempting to modify it.");
|
||||||
}
|
}
|
||||||
if (state != ControllerServiceState.DISABLED) {
|
if (state != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently not disabled - it has a state of " + state
|
throw new IllegalStateException("Cannot modify configuration of " + this + " because it is currently not disabled - it has a state of " + state
|
||||||
+ ". Please disable the Controller Service first.");
|
+ ". Please disable the Controller Service first.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -320,7 +320,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
if (getState() != ControllerServiceState.DISABLED) {
|
||||||
throw new IllegalStateException("Controller Service " + getControllerServiceImplementation().getIdentifier() + " cannot be deleted because it is not disabled");
|
throw new IllegalStateException(this + " cannot be deleted because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +345,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!activeReferencesIdentifiers.isEmpty()) {
|
if (!activeReferencesIdentifiers.isEmpty()) {
|
||||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() +
|
throw new IllegalStateException(this + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() +
|
||||||
" components that are currently running: [" + StringUtils.join(activeReferencesIdentifiers, ", ") + "]");
|
" components that are currently running: [" + StringUtils.join(activeReferencesIdentifiers, ", ") + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -357,10 +357,10 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
case DISABLED:
|
case DISABLED:
|
||||||
return;
|
return;
|
||||||
case DISABLING:
|
case DISABLING:
|
||||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
|
throw new IllegalStateException(this + " cannot be enabled because it is not disabled - it has a state of " + state);
|
||||||
default:
|
default:
|
||||||
if (isReloadAdditionalResourcesNecessary()) {
|
if (isReloadAdditionalResourcesNecessary()) {
|
||||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because additional resources are needed - it has a state of " + state);
|
throw new IllegalStateException(this + " cannot be enabled because additional resources are needed - it has a state of " + state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -372,8 +372,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanUpdate() {
|
public void verifyCanUpdate() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
final ControllerServiceState state = getState();
|
||||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be updated because it is not disabled");
|
if (state != ControllerServiceState.DISABLED) {
|
||||||
|
throw new IllegalStateException(this + " cannot be updated because it is not disabled - it has a state of " + state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,8 +433,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanPerformVerification() {
|
public void verifyCanPerformVerification() {
|
||||||
if (getState() != ControllerServiceState.DISABLED) {
|
final ControllerServiceState state = getState();
|
||||||
throw new IllegalStateException("Cannot perform verification because the Controller Service is not disabled");
|
if (state != ControllerServiceState.DISABLED) {
|
||||||
|
throw new IllegalStateException("Cannot perform verification because the " + this + " is not disabled - it has a state of " + state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,4 +48,9 @@ public class ControllerServiceLogObserver implements LogObserver {
|
||||||
serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
|
serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
|
||||||
bulletinRepository.addBulletin(bulletin);
|
bulletinRepository.addBulletin(bulletin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getComponentDescription() {
|
||||||
|
return serviceNode.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,4 +44,9 @@ public class ProcessorLogObserver implements LogObserver {
|
||||||
bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid()));
|
bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getComponentDescription() {
|
||||||
|
return processorNode.toString();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,4 +42,9 @@ public class ReportingTaskLogObserver implements LogObserver {
|
||||||
taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
|
taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
|
||||||
bulletinRepository.addBulletin(bulletin);
|
bulletinRepository.addBulletin(bulletin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getComponentDescription() {
|
||||||
|
return taskNode.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,8 @@ public class StandardLogRepository implements LogRepository {
|
||||||
try {
|
try {
|
||||||
// ensure observer does not exists
|
// ensure observer does not exists
|
||||||
if (observerLookup.containsKey(observerIdentifier)) {
|
if (observerLookup.containsKey(observerIdentifier)) {
|
||||||
throw new IllegalStateException("The specified observer identifier (" + observerIdentifier + ") already exists.");
|
throw new IllegalStateException("Cannot add Log Observer for " + observer.getComponentDescription() +
|
||||||
|
" because the specified observer identifier (" + observerIdentifier + ") already exists.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final LogLevel[] allLevels = LogLevel.values();
|
final LogLevel[] allLevels = LogLevel.values();
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.controller.label;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class TestStandardLabel {
|
||||||
|
@Test
|
||||||
|
public void testToString() {
|
||||||
|
final StandardLabel nullValue = new StandardLabel("id", null);
|
||||||
|
assertEquals("StandardLabel[id=id, text=]", nullValue.toString());
|
||||||
|
|
||||||
|
final StandardLabel shortValue = new StandardLabel("id", "short");
|
||||||
|
assertEquals("StandardLabel[id=id, text=short]", shortValue.toString());
|
||||||
|
|
||||||
|
final StandardLabel longValue = new StandardLabel("id", "123456789012345678901234567890123456789012345678901234567890");
|
||||||
|
assertEquals("StandardLabel[id=id, text=12345678901234567890123456789012345678901234567890...]", longValue.toString());
|
||||||
|
}
|
||||||
|
}
|
|
@ -79,6 +79,11 @@ public class TestStandardLogRepository {
|
||||||
messages.add(message);
|
messages.add(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getComponentDescription() {
|
||||||
|
return "MockLogObserver";
|
||||||
|
}
|
||||||
|
|
||||||
public List<LogMessage> getMessages() {
|
public List<LogMessage> getMessages() {
|
||||||
return messages;
|
return messages;
|
||||||
}
|
}
|
||||||
|
|
|
@ -705,6 +705,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) {
|
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) {
|
||||||
return getComponent().toString();
|
return getComponent().toString();
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
return getClass().getSimpleName() + "[id=" + getIdentifier() + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,4 +19,7 @@ package org.apache.nifi.logging;
|
||||||
public interface LogObserver {
|
public interface LogObserver {
|
||||||
|
|
||||||
void onLogMessage(LogMessage message);
|
void onLogMessage(LogMessage message);
|
||||||
|
|
||||||
|
String getComponentDescription();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue