diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index d6eaed6fa5..a876d51b76 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -307,6 +307,8 @@ public class StatusMerger { merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); + target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting())); + if (target.getNodeSnapshots() != null) { final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 5cb26eab3f..873cd33e88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -16,6 +16,24 @@ */ package org.apache.nifi.remote; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult.Result; import org.apache.nifi.authorization.Authorizer; @@ -55,24 +73,6 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static java.util.Objects.requireNonNull; - public class StandardRootGroupPort extends AbstractPort implements RootGroupPort { private static final String CATEGORY = "Site to Site"; @@ -81,10 +81,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort private final AtomicReference> groupAccessControl = new AtomicReference>(new HashSet()); private final AtomicReference> userAccessControl = new AtomicReference>(new HashSet()); - private final ProcessScheduler processScheduler; private final boolean secure; private final Authorizer authorizer; - private final NiFiProperties nifiProperties; private final List identityMappings; @SuppressWarnings("unused") @@ -105,11 +103,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort final NiFiProperties nifiProperties) { super(id, name, processGroup, type, scheduler); - this.processScheduler = scheduler; setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); this.authorizer = authorizer; this.secure = secure; - this.nifiProperties = nifiProperties; this.identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties); this.bulletinRepository = bulletinRepository; this.scheduler = scheduler; @@ -293,10 +289,6 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort return false; } - if (!requestQueue.isEmpty()) { - return true; - } - requestLock.lock(); try { return !activeRequests.isEmpty(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-port.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-port.js index d7530732b4..0523fc581d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-port.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-port.js @@ -410,7 +410,7 @@ nf.Port = (function () { updated.select('text.port-transmission-icon') .attr({ 'font-family': function (d) { - if (d.status.aggregateSnapshot.transmitting === true) { + if (d.status.transmitting === true) { return 'FontAwesome'; } else { return 'flowfont'; @@ -418,21 +418,21 @@ nf.Port = (function () { } }) .text(function (d) { - if (d.status.aggregateSnapshot.transmitting === true) { + if (d.status.transmitting === true) { return '\uf140'; } else { return '\ue80a'; } }) .classed('transmitting', function (d) { - if (d.status.aggregateSnapshot.transmitting === true) { + if (d.status.transmitting === true) { return true; } else { return false; } }) .classed('not-transmitting', function (d) { - if (d.status.aggregateSnapshot.transmitting !== true) { + if (d.status.transmitting !== true) { return true; } else { return false;