NIFI-9061: Eliminated the nifi.cluster.node.protocol.threads property in favor of nifi.cluster.node.protocol.max.threads property so that we can properly scale out the number of threads used for HTTP request replication. Implementing a caching mechanism for creating the DateTimeFormatter used by TimeAdapter in order to improve performance when parsing timestamps in web requests. Implementing caching logic for caching the number of characters that can rendered without needing an ellipsis for some components in the UI (#5316)

This closes #5316
This commit is contained in:
markap14 2021-09-13 14:36:35 -04:00 committed by GitHub
parent 38a1f476e3
commit 73f88b3239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 195 additions and 83 deletions

View File

@ -243,7 +243,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node"; public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node";
public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address"; public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port"; public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads";
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
@ -878,15 +877,7 @@ public class NiFiProperties extends ApplicationProperties {
*/ */
@Deprecated() @Deprecated()
public int getClusterNodeProtocolThreads() { public int getClusterNodeProtocolThreads() {
return getClusterNodeProtocolCorePoolSize(); return getClusterNodeProtocolMaxPoolSize();
}
public int getClusterNodeProtocolCorePoolSize() {
try {
return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
} catch (NumberFormatException nfe) {
return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS;
}
} }
public int getClusterNodeProtocolMaxPoolSize() { public int getClusterNodeProtocolMaxPoolSize() {

View File

@ -167,7 +167,6 @@ volume to provide certificates on the host system to the container instance.
| nifi.cluster.is.node | NIFI_CLUSTER_IS_NODE | | nifi.cluster.is.node | NIFI_CLUSTER_IS_NODE |
| nifi.cluster.node.address | NIFI_CLUSTER_ADDRESS | | nifi.cluster.node.address | NIFI_CLUSTER_ADDRESS |
| nifi.cluster.node.protocol.port | NIFI_CLUSTER_NODE_PROTOCOL_PORT | | nifi.cluster.node.protocol.port | NIFI_CLUSTER_NODE_PROTOCOL_PORT |
| nifi.cluster.node.protocol.threads | NIFI_CLUSTER_NODE_PROTOCOL_THREADS |
| nifi.cluster.node.protocol.max.threads | NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS | | nifi.cluster.node.protocol.max.threads | NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS |
| nifi.zookeeper.connect.string | NIFI_ZK_CONNECT_STRING | | nifi.zookeeper.connect.string | NIFI_ZK_CONNECT_STRING |
| nifi.zookeeper.root.node | NIFI_ZK_ROOT_NODE | | nifi.zookeeper.root.node | NIFI_ZK_ROOT_NODE |

View File

@ -82,7 +82,6 @@ prop_replace 'nifi.variable.registry.properties' "${NIFI_VARIABLE_REGISTRY_PR
prop_replace 'nifi.cluster.is.node' "${NIFI_CLUSTER_IS_NODE:-false}" prop_replace 'nifi.cluster.is.node' "${NIFI_CLUSTER_IS_NODE:-false}"
prop_replace 'nifi.cluster.node.address' "${NIFI_CLUSTER_ADDRESS:-$HOSTNAME}" prop_replace 'nifi.cluster.node.address' "${NIFI_CLUSTER_ADDRESS:-$HOSTNAME}"
prop_replace 'nifi.cluster.node.protocol.port' "${NIFI_CLUSTER_NODE_PROTOCOL_PORT:-}" prop_replace 'nifi.cluster.node.protocol.port' "${NIFI_CLUSTER_NODE_PROTOCOL_PORT:-}"
prop_replace 'nifi.cluster.node.protocol.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_THREADS:-10}"
prop_replace 'nifi.cluster.node.protocol.max.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS:-50}" prop_replace 'nifi.cluster.node.protocol.max.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS:-50}"
prop_replace 'nifi.zookeeper.connect.string' "${NIFI_ZK_CONNECT_STRING:-}" prop_replace 'nifi.zookeeper.connect.string' "${NIFI_ZK_CONNECT_STRING:-}"
prop_replace 'nifi.zookeeper.root.node' "${NIFI_ZK_ROOT_NODE:-/nifi}" prop_replace 'nifi.zookeeper.root.node' "${NIFI_ZK_ROOT_NODE:-/nifi}"

View File

@ -82,7 +82,6 @@ prop_replace 'nifi.variable.registry.properties' "${NIFI_VARIABLE_REGISTRY_PR
prop_replace 'nifi.cluster.is.node' "${NIFI_CLUSTER_IS_NODE:-false}" prop_replace 'nifi.cluster.is.node' "${NIFI_CLUSTER_IS_NODE:-false}"
prop_replace 'nifi.cluster.node.address' "${NIFI_CLUSTER_ADDRESS:-$HOSTNAME}" prop_replace 'nifi.cluster.node.address' "${NIFI_CLUSTER_ADDRESS:-$HOSTNAME}"
prop_replace 'nifi.cluster.node.protocol.port' "${NIFI_CLUSTER_NODE_PROTOCOL_PORT:-}" prop_replace 'nifi.cluster.node.protocol.port' "${NIFI_CLUSTER_NODE_PROTOCOL_PORT:-}"
prop_replace 'nifi.cluster.node.protocol.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_THREADS:-10}"
prop_replace 'nifi.cluster.node.protocol.max.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS:-50}" prop_replace 'nifi.cluster.node.protocol.max.threads' "${NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS:-50}"
prop_replace 'nifi.zookeeper.connect.string' "${NIFI_ZK_CONNECT_STRING:-}" prop_replace 'nifi.zookeeper.connect.string' "${NIFI_ZK_CONNECT_STRING:-}"
prop_replace 'nifi.zookeeper.root.node' "${NIFI_ZK_ROOT_NODE:-/nifi}" prop_replace 'nifi.zookeeper.root.node' "${NIFI_ZK_ROOT_NODE:-/nifi}"

View File

@ -2135,14 +2135,9 @@ configured in the _state-management.xml_ file. See <<state_providers>> for more
** `nifi.cluster.is.node` - Set this to _true_. ** `nifi.cluster.is.node` - Set this to _true_.
** `nifi.cluster.node.address` - Set this to the fully qualified hostname of the node. If left blank, it defaults to `localhost`. ** `nifi.cluster.node.address` - Set this to the fully qualified hostname of the node. If left blank, it defaults to `localhost`.
** `nifi.cluster.node.protocol.port` - Set this to an open port that is higher than 1024 (anything lower requires root). ** `nifi.cluster.node.protocol.port` - Set this to an open port that is higher than 1024 (anything lower requires root).
** `nifi.cluster.node.protocol.threads` - The number of threads that should be used to communicate with other nodes in the cluster. This property
defaults to `10`. A thread pool is used for replicating requests to all nodes, and the
thread pool will never have fewer than this number of threads. It will grow as needed up to the maximum value set by the `nifi.cluster.node.protocol.max.threads`
property.
** `nifi.cluster.node.protocol.max.threads` - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property ** `nifi.cluster.node.protocol.max.threads` - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property
defaults to `50`. A thread pool is used for replication requests to all nodes, and the thread pool will have a "core" size that is configured by the defaults to `50`. A thread pool is used for replicating requests to all nodes. The thread pool will increase the number of active threads to the limit
`nifi.cluster.node.protocol.threads` property. However, if necessary, the thread pool will increase the number of active threads to the limit set by this property. It is typically recommended that this property be set to 4-8 times the number of nodes in your cluster.
set by this property.
** `nifi.zookeeper.connect.string` - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separated list ** `nifi.zookeeper.connect.string` - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separated list
of hostname:port pairs. For example, `localhost:2181,localhost:2182,localhost:2183`. This should contain a list of all ZooKeeper of hostname:port pairs. For example, `localhost:2181,localhost:2182,localhost:2183`. This should contain a list of all ZooKeeper
instances in the ZooKeeper quorum. instances in the ZooKeeper quorum.
@ -3878,8 +3873,6 @@ Configure these properties for cluster nodes.
|`nifi.cluster.is.node`|Set this to `true` if the instance is a node in a cluster. The default value is `false`. |`nifi.cluster.is.node`|Set this to `true` if the instance is a node in a cluster. The default value is `false`.
|`nifi.cluster.node.address`|The fully qualified address of the node. It is blank by default. |`nifi.cluster.node.address`|The fully qualified address of the node. It is blank by default.
|`nifi.cluster.node.protocol.port`|The node's protocol port. It is blank by default. |`nifi.cluster.node.protocol.port`|The node's protocol port. It is blank by default.
|`nifi.cluster.node.protocol.threads`|The number of threads that should be used to communicate with other nodes
in the cluster. This property defaults to `10`, but for large clusters, this value may need to be larger.
|`nifi.cluster.node.protocol.max.threads`|The maximum number of threads that should be used to communicate with other nodes in the cluster. This property defaults to `50`. |`nifi.cluster.node.protocol.max.threads`|The maximum number of threads that should be used to communicate with other nodes in the cluster. This property defaults to `50`.
|`nifi.cluster.node.event.history.size`|When the state of a node in the cluster is changed, an event is generated |`nifi.cluster.node.event.history.size`|When the state of a node in the cluster is changed, an event is generated
and can be viewed in the Cluster page. This value indicates how many events to keep in memory for each node. The default value is `25`. and can be viewed in the Cluster page. This value indicates how many events to keep in memory for each node. The default value is `25`.

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.util;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
* Some adapters need to create a Date object from a String that contains only a portion of it, such as the time.
* This is handled by using a DateTimeFormatter with defaulted values for some fields. The defaults are derived from
* the current date/time. Because of this, we can't just create a DateTimeFormatter once and never change it, as doing
* so would result in the wrong date/time after midnight, when the date changes.
* But we don't want to create a new instance of DateTimeFormatter every time, either, because it uses lazy initialization,
* so the first call to parse() is far more expensive than subsequent calls.
* This class allows us to easily create a DateTimeFormatter and cache it, continually reusing it, until the date changes.
*/
public class ParseDefaultingDateTimeFormatter {
private final AtomicReference<Wrapper> wrapperReference = new AtomicReference<>();
private final Function<LocalDateTime, String> applicabilityTransform;
private final Function<LocalDateTime, DateTimeFormatter> formatFactory;
/**
* Default constructor
* @param applicabilityTransform a transform that creates a String that can be used identify whether or not previously created DateTimeFormatter exists. This may be created, for instance,
* by concatenating specific fields from the given LocalDateTime
* @param formatFactory a transform that will give us a DateTimeFormatter that is applicable for the given LocalDateTime
*/
public ParseDefaultingDateTimeFormatter(final Function<LocalDateTime, String> applicabilityTransform, final Function<LocalDateTime, DateTimeFormatter> formatFactory) {
this.applicabilityTransform = applicabilityTransform;
this.formatFactory = formatFactory;
}
public DateTimeFormatter get() {
final LocalDateTime now = LocalDateTime.now();
final String applicabilityValue = applicabilityTransform.apply(now);
final Wrapper wrapper = wrapperReference.get();
if (wrapper != null && wrapper.getApplicabilityValue().equals(applicabilityValue)) {
return wrapper.getFormatter();
}
final DateTimeFormatter formatter = formatFactory.apply(now);
final Wrapper updatedWrapper = new Wrapper(formatter, applicabilityValue);
wrapperReference.compareAndSet(wrapper, updatedWrapper);
return formatter;
}
private static class Wrapper {
private final DateTimeFormatter formatter;
private final String applicabilityValue;
public Wrapper(final DateTimeFormatter formatter, final String applicabilityValue) {
this.formatter = formatter;
this.applicabilityValue = applicabilityValue;
}
public DateTimeFormatter getFormatter() {
return formatter;
}
public String getApplicabilityValue() {
return applicabilityValue;
}
}
}

View File

@ -37,23 +37,29 @@ public class TimeAdapter extends XmlAdapter<String, Date> {
private static final ZoneId ZONE_ID = TimeZone.getDefault().toZoneId(); private static final ZoneId ZONE_ID = TimeZone.getDefault().toZoneId();
@Override @Override
public String marshal(Date date) throws Exception { public String marshal(Date date) {
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT, Locale.US); final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT, Locale.US);
final ZonedDateTime localDateTime = ZonedDateTime.ofInstant(date.toInstant(), ZONE_ID); final ZonedDateTime localDateTime = ZonedDateTime.ofInstant(date.toInstant(), ZONE_ID);
return formatter.format(localDateTime); return formatter.format(localDateTime);
} }
@Override
public Date unmarshal(String date) throws Exception { private final ParseDefaultingDateTimeFormatter formatter = new ParseDefaultingDateTimeFormatter(
final LocalDateTime now = LocalDateTime.now(); timestamp -> String.format("%s%s%s", timestamp.getYear(), timestamp.getMonthValue(), timestamp.getDayOfMonth()),
final DateTimeFormatter parser = new DateTimeFormatterBuilder().appendPattern(DEFAULT_TIME_FORMAT) timestamp -> new DateTimeFormatterBuilder().appendPattern(DEFAULT_TIME_FORMAT)
.parseDefaulting(ChronoField.YEAR, now.getYear()) .parseDefaulting(ChronoField.YEAR, timestamp.getYear())
.parseDefaulting(ChronoField.MONTH_OF_YEAR, now.getMonthValue()) .parseDefaulting(ChronoField.MONTH_OF_YEAR, timestamp.getMonthValue())
.parseDefaulting(ChronoField.DAY_OF_MONTH, now.getDayOfMonth()) .parseDefaulting(ChronoField.DAY_OF_MONTH, timestamp.getDayOfMonth())
.parseDefaulting(ChronoField.MILLI_OF_SECOND, 0) .parseDefaulting(ChronoField.MILLI_OF_SECOND, 0)
.toFormatter(Locale.US); .toFormatter(Locale.US));
@Override
public Date unmarshal(String date) {
final DateTimeFormatter parser = formatter.get();
final LocalDateTime parsedDateTime = LocalDateTime.parse(date, parser); final LocalDateTime parsedDateTime = LocalDateTime.parse(date, parser);
final LocalDateTime now = LocalDateTime.now();
return Date.from(parsedDateTime.toInstant(ZONE_ID.getRules().getOffset(now))); return Date.from(parsedDateTime.toInstant(ZONE_ID.getRules().getOffset(now)));
} }
} }

View File

@ -103,7 +103,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
/** /**
* Creates an instance. * Creates an instance.
* *
* @param corePoolSize core size of the thread pool
* @param maxPoolSize the max number of threads in the thread pool * @param maxPoolSize the max number of threads in the thread pool
* @param maxConcurrentRequests maximum number of concurrent requests * @param maxConcurrentRequests maximum number of concurrent requests
* @param client a client for making requests * @param client a client for making requests
@ -112,12 +111,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
* @param nifiProperties properties * @param nifiProperties properties
*/ */
public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final HttpReplicationClient client, public ThreadPoolRequestReplicator(final int maxPoolSize, final int maxConcurrentRequests, final HttpReplicationClient client,
final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
if (corePoolSize <= 0) { if (maxPoolSize < 2) {
throw new IllegalArgumentException("The Core Pool Size must be greater than zero."); throw new IllegalArgumentException("Max Pool Size must be >= 2");
} else if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size.");
} else if (client == null) { } else if (client == null) {
throw new IllegalArgumentException("Client may not be null."); throw new IllegalArgumentException("Client may not be null.");
} }
@ -138,7 +135,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return t; return t;
}; };
executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); executorService = new ThreadPoolExecutor(maxPoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
executorService.allowCoreThreadTimeOut(true);
maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override @Override

View File

@ -42,13 +42,12 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class); final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class); final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class);
final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize();
final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize(); final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize();
final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests(); final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests();
final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(nifiProperties); final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(nifiProperties);
replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, replicationClient, clusterCoordinator, replicator = new ThreadPoolRequestReplicator(maxPoolSize, maxConcurrentRequests, replicationClient, clusterCoordinator,
requestCompletionCallback, eventReporter, nifiProperties); requestCompletionCallback, eventReporter, nifiProperties);
} }

View File

@ -268,7 +268,7 @@ public class TestThreadPoolRequestReplicator {
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
}; };
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override @Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) { final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
@ -345,7 +345,7 @@ public class TestThreadPoolRequestReplicator {
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
}; };
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override @Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
boolean indicateReplicated, boolean verify) { boolean indicateReplicated, boolean verify) {
@ -408,7 +408,7 @@ public class TestThreadPoolRequestReplicator {
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
}; };
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override @Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) { final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
@ -628,7 +628,7 @@ public class TestThreadPoolRequestReplicator {
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> { final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
}; };
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, nifiProps) { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, nifiProps) {
@Override @Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, final String requestId, protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, final String requestId,
final StandardAsyncClusterResponse response) { final StandardAsyncClusterResponse response) {

View File

@ -34,7 +34,6 @@ public class NiFiPropertiesDiagnosticTask implements DiagnosticTask {
"nifi.zookeeper.session.timeout", "nifi.zookeeper.session.timeout",
"nifi.ui.autorefresh.interval", "nifi.ui.autorefresh.interval",
"nifi.cluster.node.protocol.max.threads", "nifi.cluster.node.protocol.max.threads",
"nifi.cluster.node.protocol.threads",
"nifi.security.allow.anonymous.authentication", "nifi.security.allow.anonymous.authentication",
"nifi.security.user.login.identity.provider", "nifi.security.user.login.identity.provider",
"nifi.security.user.authorizer", "nifi.security.user.authorizer",

View File

@ -210,7 +210,6 @@
<nifi.cluster.is.node>false</nifi.cluster.is.node> <nifi.cluster.is.node>false</nifi.cluster.is.node>
<nifi.cluster.node.address /> <nifi.cluster.node.address />
<nifi.cluster.node.protocol.port /> <nifi.cluster.node.protocol.port />
<nifi.cluster.node.protocol.threads>10</nifi.cluster.node.protocol.threads>
<nifi.cluster.node.protocol.max.threads>50</nifi.cluster.node.protocol.max.threads> <nifi.cluster.node.protocol.max.threads>50</nifi.cluster.node.protocol.max.threads>
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size> <nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout> <nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>

View File

@ -261,7 +261,6 @@ nifi.cluster.protocol.is.secure=${nifi.cluster.protocol.is.secure}
nifi.cluster.is.node=${nifi.cluster.is.node} nifi.cluster.is.node=${nifi.cluster.is.node}
nifi.cluster.node.address=${nifi.cluster.node.address} nifi.cluster.node.address=${nifi.cluster.node.address}
nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port} nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port}
nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads} nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}

View File

@ -55,6 +55,7 @@
var nfSnippet; var nfSnippet;
var nfBirdseye; var nfBirdseye;
var nfGraph; var nfGraph;
var trimLengthCaches = {};
var restrictedUsage = d3.map(); var restrictedUsage = d3.map();
var requiredPermissions = d3.map(); var requiredPermissions = d3.map();
@ -707,13 +708,21 @@
}); });
}, },
/**
* Clears the cache used to avoid calculating whether or not ellipses are needed for a given text element
*/
clearEllipsisCache: function () {
trimLengthCaches = {};
},
/** /**
* Applies single line ellipsis to the component in the specified selection if necessary. * Applies single line ellipsis to the component in the specified selection if necessary.
* *
* @param {selection} selection * @param {selection} selection
* @param {string} text * @param {string} text
* @param {cacheName} string
*/ */
ellipsis: function (selection, text) { ellipsis: function (selection, text, cacheName) {
text = text.trim(); text = text.trim();
var width = parseInt(selection.attr('width'), 10); var width = parseInt(selection.attr('width'), 10);
var node = selection.node(); var node = selection.node();
@ -721,8 +730,26 @@
// set the element text // set the element text
selection.text(text); selection.text(text);
// see if the field is too big for the field // Never apply ellipses to text less than 5 characters and don't keep it in the cache
if (text.length > 0 && node.getSubStringLength(0, text.length - 1) > width) { // because it could take up a lot of space unnecessarily.
var textLength = text.length;
if (textLength < 5) {
return;
}
// Check our cache of text lengths to see if we already know how much to trim it to
var trimLengths = trimLengthCaches[cacheName];
if (trimLengths === undefined) {
trimLengths = {};
trimLengthCaches[cacheName] = trimLengths;
}
var cacheForText = trimLengths[text];
var trimLength = (cacheForText === undefined) ? undefined : cacheForText[width];
if (trimLength === undefined) {
// We haven't cached the length for this text yet. Determine whether we need
// to trim & add ellipses or not
if (node.getSubStringLength(0, text.length - 1) > width) {
// make some room for the ellipsis // make some room for the ellipsis
width -= 5; width -= 5;
@ -739,9 +766,26 @@
return 0; return 0;
}); });
// trim at the appropriate length and add ellipsis trimLength = i;
selection.text(text.substring(0, i) + String.fromCharCode(8230)); } else {
// trimLength of -1 indicates we do not need ellipses
trimLength = -1;
} }
// TODO: Can we clear this when process group changes?
// Store the trim length in our cache
if (trimLengths[text] === undefined) {
trimLengths[text] = {};
}
trimLengths[text][width] = trimLength;
}
if (trimLength === -1) {
return;
}
// trim at the appropriate length and add ellipsis
selection.text(text.substring(0, trimLength) + String.fromCharCode(8230));
}, },
/** /**
@ -751,8 +795,9 @@
* @param {selection} selection * @param {selection} selection
* @param {integer} lineCount * @param {integer} lineCount
* @param {string} text * @param {string} text
* @param {string} cacheName
*/ */
multilineEllipsis: function (selection, lineCount, text) { multilineEllipsis: function (selection, lineCount, text, cacheName) {
var i = 1; var i = 1;
var words = text.split(/\s+/).reverse(); var words = text.split(/\s+/).reverse();
@ -801,7 +846,7 @@
var remainder = [word].concat(words.reverse()); var remainder = [word].concat(words.reverse());
// apply ellipsis to the last line // apply ellipsis to the last line
nfCanvasUtils.ellipsis(tspan, remainder.join(' ')); nfCanvasUtils.ellipsis(tspan, remainder.join(' '), cacheName);
// we've reached the line count // we've reached the line count
break; break;

View File

@ -231,6 +231,9 @@
var currentProcessGroup = nfCanvas.getGroupId(); var currentProcessGroup = nfCanvas.getGroupId();
var currentParameterContext = nfCanvas.getParameterContext(); var currentParameterContext = nfCanvas.getParameterContext();
// clear caches because what is in the cache may not be applicable and we don't want the caches to grow indefinitely.
nfCanvasUtils.clearEllipsisCache();
// update process group id and attempt to reload // update process group id and attempt to reload
nfCanvas.setGroupId(processGroupId); nfCanvas.setGroupId(processGroupId);
var processGroupXhr = reloadProcessGroup(options); var processGroupXhr = reloadProcessGroup(options);

View File

@ -931,7 +931,7 @@
connectionFromLabel.text(null).selectAll('title').remove(); connectionFromLabel.text(null).selectAll('title').remove();
// apply ellipsis to the label as necessary // apply ellipsis to the label as necessary
nfCanvasUtils.ellipsis(connectionFromLabel, d.component.source.name); nfCanvasUtils.ellipsis(connectionFromLabel, d.component.source.name, 'connection-from');
}).append('title').text(function () { }).append('title').text(function () {
return d.component.source.name; return d.component.source.name;
}); });
@ -1040,7 +1040,7 @@
connectionToLabel.text(null).selectAll('title').remove(); connectionToLabel.text(null).selectAll('title').remove();
// apply ellipsis to the label as necessary // apply ellipsis to the label as necessary
nfCanvasUtils.ellipsis(connectionToLabel, d.component.destination.name); nfCanvasUtils.ellipsis(connectionToLabel, d.component.destination.name, 'connection-to');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return d.component.destination.name; return d.component.destination.name;
}); });
@ -1145,7 +1145,7 @@
connectionToLabel.text(null).selectAll('title').remove(); connectionToLabel.text(null).selectAll('title').remove();
// apply ellipsis to the label as necessary // apply ellipsis to the label as necessary
nfCanvasUtils.ellipsis(connectionToLabel, connectionNameValue); nfCanvasUtils.ellipsis(connectionToLabel, connectionNameValue, 'connection-name');
}).append('title').text(function () { }).append('title').text(function () {
return connectionNameValue; return connectionNameValue;
}); });

View File

@ -365,9 +365,9 @@
// handle based on the number of tokens in the port name // handle based on the number of tokens in the port name
if (words.length === 1) { if (words.length === 1) {
// apply ellipsis to the port name as necessary // apply ellipsis to the port name as necessary
nfCanvasUtils.ellipsis(portName, name); nfCanvasUtils.ellipsis(portName, name, 'port-name');
} else { } else {
nfCanvasUtils.multilineEllipsis(portName, 2, name); nfCanvasUtils.multilineEllipsis(portName, 2, name, 'port-name');
} }
}).attrs({ }).attrs({
'y': offsetY(25) 'y': offsetY(25)

View File

@ -1239,7 +1239,7 @@
processGroupName.text(null).selectAll('title').remove(); processGroupName.text(null).selectAll('title').remove();
// apply ellipsis to the process group name as necessary // apply ellipsis to the process group name as necessary
nfCanvasUtils.ellipsis(processGroupName, d.component.name); nfCanvasUtils.ellipsis(processGroupName, d.component.name, 'group-name');
}) })
.append('title') .append('title')
.text(function (d) { .text(function (d) {

View File

@ -597,7 +597,7 @@
processorName.text(null).selectAll('title').remove(); processorName.text(null).selectAll('title').remove();
// apply ellipsis to the processor name as necessary // apply ellipsis to the processor name as necessary
nfCanvasUtils.ellipsis(processorName, d.component.name); nfCanvasUtils.ellipsis(processorName, d.component.name, 'processor-name');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return d.component.name; return d.component.name;
}); });
@ -611,7 +611,7 @@
processorType.text(null).selectAll('title').remove(); processorType.text(null).selectAll('title').remove();
// apply ellipsis to the processor type as necessary // apply ellipsis to the processor type as necessary
nfCanvasUtils.ellipsis(processorType, nfCommon.formatType(d.component)); nfCanvasUtils.ellipsis(processorType, nfCommon.formatType(d.component), 'processor-type');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return nfCommon.formatType(d.component); return nfCommon.formatType(d.component);
}); });
@ -625,7 +625,7 @@
processorBundle.text(null).selectAll('title').remove(); processorBundle.text(null).selectAll('title').remove();
// apply ellipsis to the processor type as necessary // apply ellipsis to the processor type as necessary
nfCanvasUtils.ellipsis(processorBundle, nfCommon.formatBundle(d.component.bundle)); nfCanvasUtils.ellipsis(processorBundle, nfCommon.formatBundle(d.component.bundle), 'processor-bundle');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return nfCommon.formatBundle(d.component.bundle); return nfCommon.formatBundle(d.component.bundle);
}); });

View File

@ -505,7 +505,7 @@
remoteProcessGroupUri.text(null).selectAll('title').remove(); remoteProcessGroupUri.text(null).selectAll('title').remove();
// apply ellipsis to the remote process group name as necessary // apply ellipsis to the remote process group name as necessary
nfCanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUris); nfCanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUris, 'rpg-uri');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return d.component.name; return d.component.name;
}); });
@ -604,7 +604,7 @@
remoteProcessGroupName.text(null).selectAll('title').remove(); remoteProcessGroupName.text(null).selectAll('title').remove();
// apply ellipsis to the remote process group name as necessary // apply ellipsis to the remote process group name as necessary
nfCanvasUtils.ellipsis(remoteProcessGroupName, d.component.name); nfCanvasUtils.ellipsis(remoteProcessGroupName, d.component.name, 'rpg-name');
}).append('title').text(function (d) { }).append('title').text(function (d) {
return d.component.name; return d.component.name;
}); });