From af3a578711b7ed574ab37257822baa67fdbe8a26 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 13 Nov 2017 13:17:18 -0500 Subject: [PATCH] NIFI-4598: When we retrieve the 'controller' from a remote NiFi instance in order to determine which ports are available, cache those results for up to some configurable amount of time (default 30 secs) so that we don't constantly issue HTTP Requests to the remote nifi This closes #2270. Signed-off-by: Bryan Bende --- .../org/apache/nifi/util/NiFiProperties.java | 1 + .../client/AbstractSiteToSiteClient.java | 1 + .../nifi/remote/client/PeerSelector.java | 2 +- .../nifi/remote/client/SiteInfoProvider.java | 6 ++ .../nifi/remote/client/SiteToSiteClient.java | 23 +++++ .../remote/client/SiteToSiteClientConfig.java | 11 +++ .../nifi/remote/client/http/HttpClient.java | 2 + .../remote/util/SiteToSiteRestApiClient.java | 89 ++++++++++++++++++- .../main/asciidoc/administration-guide.adoc | 3 + .../remote/StandardRemoteProcessGroup.java | 6 +- .../src/main/resources/conf/nifi.properties | 1 + 11 files changed, 139 insertions(+), 6 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index d51dea44b2..beb222a496 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -73,6 +73,7 @@ public abstract class NiFiProperties { public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; + public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java index c6b98a05f9..a35be863b3 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java @@ -32,6 +32,7 @@ public abstract class AbstractSiteToSiteClient implements SiteToSiteClient { siteInfoProvider.setSslContext(config.getSslContext()); siteInfoProvider.setConnectTimeoutMillis(commsTimeout); siteInfoProvider.setReadTimeoutMillis(commsTimeout); + siteInfoProvider.setCachedContentsExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); siteInfoProvider.setProxy(config.getHttpProxy()); siteInfoProvider.setLocalAddress(config.getLocalAddress()); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index a7bd0945f0..a4439673d2 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -22,10 +22,10 @@ import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java index 06b939dff1..f8151da7be 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -59,6 +59,7 @@ public class SiteInfoProvider { private SSLContext sslContext; private int connectTimeoutMillis; private int readTimeoutMillis; + private long cachedContentExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private ControllerDTO refreshRemoteInfo() throws IOException { @@ -104,6 +105,7 @@ public class SiteInfoProvider { apiClient.setConnectTimeoutMillis(connectTimeoutMillis); apiClient.setReadTimeoutMillis(readTimeoutMillis); apiClient.setLocalAddress(localAddress); + apiClient.setCacheExpirationMillis(cachedContentExpirationMillis); return apiClient; } @@ -272,6 +274,10 @@ public class SiteInfoProvider { this.readTimeoutMillis = readTimeoutMillis; } + public void setCachedContentsExpirationMillis(long expirationMillis) { + this.cachedContentExpirationMillis = expirationMillis; + } + public void setProxy(HttpProxy proxy) { this.proxy = proxy; } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index daff70d662..39dbda516e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -154,6 +154,7 @@ public interface SiteToSiteClient extends Closeable { private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private long contentsCacheExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private SSLContext sslContext; private String keystoreFilename; private String keystorePass; @@ -184,6 +185,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.contentsCacheExpirationMillis = config.getCacheExpiration(TimeUnit.MILLISECONDS); this.sslContext = config.getSslContext(); this.keystoreFilename = config.getKeystoreFilename(); this.keystorePass = config.getKeystorePassword(); @@ -273,6 +275,19 @@ public interface SiteToSiteClient extends Closeable { return this; } + /** + * Specifies how long the contents of a remote NiFi instance should be cached before making + * another web request to the remote instance. + * + * @param expirationPeriod the amount of time that an entry in the cache should expire + * @param unit unit of time over which to interpret the given expirationPeriod + * @return the builder + */ + public Builder cacheExpiration(final long expirationPeriod, final TimeUnit unit) { + this.contentsCacheExpirationMillis = unit.toMillis(expirationPeriod); + return this; + } + /** * Specifies the amount of time that a connection can remain idle in the * connection pool before it is "expired" and shutdown. The default @@ -722,6 +737,7 @@ public interface SiteToSiteClient extends Closeable { private final long timeoutNanos; private final long penalizationNanos; private final long idleExpirationNanos; + private final long contentsCacheExpirationMillis; private final SSLContext sslContext; private final String keystoreFilename; private final String keystorePass; @@ -746,6 +762,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = 0; this.penalizationNanos = 0; this.idleExpirationNanos = 0; + this.contentsCacheExpirationMillis = 30000L; this.sslContext = null; this.keystoreFilename = null; this.keystorePass = null; @@ -773,6 +790,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = builder.timeoutNanos; this.penalizationNanos = builder.penalizationNanos; this.idleExpirationNanos = builder.idleExpirationNanos; + this.contentsCacheExpirationMillis = builder.contentsCacheExpirationMillis; this.sslContext = builder.sslContext; this.keystoreFilename = builder.keystoreFilename; this.keystorePass = builder.keystorePass; @@ -816,6 +834,11 @@ public interface SiteToSiteClient extends Closeable { return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); } + @Override + public long getCacheExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(contentsCacheExpirationMillis, TimeUnit.MILLISECONDS); + } + @Override public long getIdleConnectionExpiration(final TimeUnit timeUnit) { return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 83e8328471..8da5e706b1 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -162,6 +162,17 @@ public interface SiteToSiteClientConfig extends Serializable { */ int getPreferredBatchCount(); + /** + * When the contents of a remote NiFi instance are fetched, that information is cached + * so that many calls that are made in a short period of time do not overwhelm the remote + * NiFi instance. This method will indicate the number of milliseconds that this information + * can be cached. + * + * @param unit the desired time unit + * @return the number of milliseconds that the contents of a remote NiFi instance will be cached + */ + long getCacheExpiration(TimeUnit unit); + /** * @return the EventReporter that is to be used by clients to report events */ diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index f0bdcf143e..4213dac67a 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -106,6 +106,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); final Collection peers = apiClient.getPeers(); @@ -154,6 +155,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr apiClient.setBaseUrl(peer.getUrl()); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); apiClient.setCompress(config.isUseCompression()); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index f4de3713e9..1670a532a7 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -76,8 +76,6 @@ import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.reporting.Severity; import org.apache.nifi.security.util.CertificateUtils; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.remote.PeerDTO; @@ -90,6 +88,9 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -110,8 +111,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -176,11 +180,14 @@ public class SiteToSiteRestApiClient implements Closeable { private int connectTimeoutMillis; private int readTimeoutMillis; + private long cacheExpirationMillis = 30000L; private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$"); private Future postResult; private CountDownLatch transferDataLatch = new CountDownLatch(1); + private static final ConcurrentMap contentsMap = new ConcurrentHashMap<>(); + private volatile long lastPruneTimestamp = System.currentTimeMillis(); public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) { this.sslContext = sslContext; @@ -268,7 +275,7 @@ public class SiteToSiteRestApiClient implements Closeable { final HttpClientBuilder clientBuilder = HttpClients.custom(); if (sslContext != null) { - clientBuilder.setSslcontext(sslContext); + clientBuilder.setSSLContext(sslContext); clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor()); } @@ -368,10 +375,48 @@ public class SiteToSiteRestApiClient implements Closeable { } private ControllerDTO getController() throws IOException { + // first check cache and prune any old values. + // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed + // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. + if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) { + pruneCache(); + } + + final String internedUrl = baseUrl.intern(); + synchronized (internedUrl) { + final RemoteGroupContents groupContents = contentsMap.get(internedUrl); + + if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(cacheExpirationMillis)) { + logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", internedUrl); + + final ControllerDTO refreshedContents; + try { + refreshedContents = fetchController(); + } catch (final Exception e) { + // we failed to refresh contents, but we don't want to constantly poll the remote instance, failing. + // So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp. + final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents(); + final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController); + contentsMap.put(internedUrl, updatedContents); + throw e; + } + + logger.debug("Successfully retrieved contents for remote group at URL {}", internedUrl); + + final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents); + contentsMap.put(internedUrl, updatedContents); + return refreshedContents; + } + + logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", internedUrl); + return groupContents.getContents(); + } + } + + private ControllerDTO fetchController() throws IOException { try { final HttpGet get = createGetControllerRequest(); return execute(get, ControllerEntity.class).getController(); - } catch (final HttpGetFailedException e) { if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) { logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url..."); @@ -382,6 +427,20 @@ public class SiteToSiteRestApiClient implements Closeable { } } + private void pruneCache() { + for (final Map.Entry entry : contentsMap.entrySet()) { + final String url = entry.getKey(); + final RemoteGroupContents contents = entry.getValue(); + + // If any entry in the map is more than 4 times as old as the refresh period, + // then we can go ahead and remove it from the map. We use 4 * refreshMillis + // just to ensure that we don't have any race condition with the above #getRemoteContents. + if (contents.isOlderThan(TimeUnit.MINUTES.toMillis(5))) { + contentsMap.remove(url, contents); + } + } + } + private HttpGet createGetControllerRequest() { final HttpGet get = createGet("/site-to-site"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); @@ -1211,6 +1270,10 @@ public class SiteToSiteRestApiClient implements Closeable { this.readTimeoutMillis = readTimeoutMillis; } + public void setCacheExpirationMillis(final long expirationMillis) { + this.cacheExpirationMillis = expirationMillis; + } + public static String getFirstUrl(final String clusterUrlStr) { if (clusterUrlStr == null) { return null; @@ -1452,4 +1515,22 @@ public class SiteToSiteRestApiClient implements Closeable { } + private static class RemoteGroupContents { + private final ControllerDTO contents; + private final long timestamp; + + public RemoteGroupContents(final ControllerDTO contents) { + this.contents = contents; + this.timestamp = System.currentTimeMillis(); + } + + public ControllerDTO getContents() { + return contents; + } + + public boolean isOlderThan(final long millis) { + final long millisSinceRefresh = System.currentTimeMillis() - timestamp; + return millisSinceRefresh > millis; + } + } } diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index b0e68d5b60..12a50a9464 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2964,6 +2964,9 @@ Remote Process Groups can choose transport protocol from RAW and HTTP. Propertie Whether a Site-to-Site client uses HTTP or HTTPS is determined by `nifi.remote.input.secure`. If it is set to `true`, then requests are sent as HTTPS to `nifi.web.https.port`. If set to `false`, HTTP requests are sent to `nifi.web.http.port`. |nifi.remote.input.http.transaction.ttl|Specifies how long a transaction can stay alive on the server. By default, it is set to `30 secs`. + If a Site-to-Site client hasn’t proceeded to the next action after this period of time, the transaction is discarded from the remote NiFi instance. For example, when a client creates a transaction but doesn’t send or receive flow files, or when a client sends or receives flow files but doesn’t confirm that transaction. +|nifi.remote.contents.cache.expiration|Specifies how long NiFi should cache information about a remote NiFi instance when communicating via Site-to-Site. By default, NiFi will cache the + +responses from the remote system for `30 secs`. This allows NiFi to avoid constantly making HTTP requests to the remote system, which is particularly important when this instance of NiFi + +has many instances of Remote Process Groups. |==== === Web Properties diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 49fd90fc8b..8689d7156b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -102,6 +102,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final ProcessScheduler scheduler; private final EventReporter eventReporter; private final NiFiProperties nifiProperties; + private final long remoteContentsCacheExpiration; private final AtomicReference name = new AtomicReference<>(); private final AtomicReference position = new AtomicReference<>(new Position(0D, 0D)); @@ -156,6 +157,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { this.scheduler = flowController.getProcessScheduler(); this.authorizationIssue = "Establishing connection to " + targetUris; + final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs"); + remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS); + final BulletinRepository bulletinRepository = flowController.getBulletinRepository(); eventReporter = new EventReporter() { private static final long serialVersionUID = 1L; @@ -963,7 +967,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(getLocalAddress()); - + apiClient.setCacheExpirationMillis(remoteContentsCacheExpiration); return apiClient; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index f867ed3ae1..624f2f1883 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -124,6 +124,7 @@ nifi.remote.input.secure=false nifi.remote.input.socket.port= nifi.remote.input.http.enabled=true nifi.remote.input.http.transaction.ttl=30 sec +nifi.remote.contents.cache.expiration=30 secs # web properties # nifi.web.war.directory=${nifi.web.war.directory}