diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index d51dea44b2..beb222a496 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -73,6 +73,7 @@ public abstract class NiFiProperties { public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; + public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java index c6b98a05f9..a35be863b3 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java @@ -32,6 +32,7 @@ public abstract class AbstractSiteToSiteClient implements SiteToSiteClient { siteInfoProvider.setSslContext(config.getSslContext()); siteInfoProvider.setConnectTimeoutMillis(commsTimeout); siteInfoProvider.setReadTimeoutMillis(commsTimeout); + siteInfoProvider.setCachedContentsExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); siteInfoProvider.setProxy(config.getHttpProxy()); siteInfoProvider.setLocalAddress(config.getLocalAddress()); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index a7bd0945f0..a4439673d2 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -22,10 +22,10 @@ import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java index 06b939dff1..f8151da7be 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -59,6 +59,7 @@ public class SiteInfoProvider { private SSLContext sslContext; private int connectTimeoutMillis; private int readTimeoutMillis; + private long cachedContentExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private ControllerDTO refreshRemoteInfo() throws IOException { @@ -104,6 +105,7 @@ public class SiteInfoProvider { apiClient.setConnectTimeoutMillis(connectTimeoutMillis); apiClient.setReadTimeoutMillis(readTimeoutMillis); apiClient.setLocalAddress(localAddress); + apiClient.setCacheExpirationMillis(cachedContentExpirationMillis); return apiClient; } @@ -272,6 +274,10 @@ public class SiteInfoProvider { this.readTimeoutMillis = readTimeoutMillis; } + public void setCachedContentsExpirationMillis(long expirationMillis) { + this.cachedContentExpirationMillis = expirationMillis; + } + public void setProxy(HttpProxy proxy) { this.proxy = proxy; } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index daff70d662..39dbda516e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -154,6 +154,7 @@ public interface SiteToSiteClient extends Closeable { private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private long contentsCacheExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private SSLContext sslContext; private String keystoreFilename; private String keystorePass; @@ -184,6 +185,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.contentsCacheExpirationMillis = config.getCacheExpiration(TimeUnit.MILLISECONDS); this.sslContext = config.getSslContext(); this.keystoreFilename = config.getKeystoreFilename(); this.keystorePass = config.getKeystorePassword(); @@ -273,6 +275,19 @@ public interface SiteToSiteClient extends Closeable { return this; } + /** + * Specifies how long the contents of a remote NiFi instance should be cached before making + * another web request to the remote instance. + * + * @param expirationPeriod the amount of time that an entry in the cache should expire + * @param unit unit of time over which to interpret the given expirationPeriod + * @return the builder + */ + public Builder cacheExpiration(final long expirationPeriod, final TimeUnit unit) { + this.contentsCacheExpirationMillis = unit.toMillis(expirationPeriod); + return this; + } + /** * Specifies the amount of time that a connection can remain idle in the * connection pool before it is "expired" and shutdown. The default @@ -722,6 +737,7 @@ public interface SiteToSiteClient extends Closeable { private final long timeoutNanos; private final long penalizationNanos; private final long idleExpirationNanos; + private final long contentsCacheExpirationMillis; private final SSLContext sslContext; private final String keystoreFilename; private final String keystorePass; @@ -746,6 +762,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = 0; this.penalizationNanos = 0; this.idleExpirationNanos = 0; + this.contentsCacheExpirationMillis = 30000L; this.sslContext = null; this.keystoreFilename = null; this.keystorePass = null; @@ -773,6 +790,7 @@ public interface SiteToSiteClient extends Closeable { this.timeoutNanos = builder.timeoutNanos; this.penalizationNanos = builder.penalizationNanos; this.idleExpirationNanos = builder.idleExpirationNanos; + this.contentsCacheExpirationMillis = builder.contentsCacheExpirationMillis; this.sslContext = builder.sslContext; this.keystoreFilename = builder.keystoreFilename; this.keystorePass = builder.keystorePass; @@ -816,6 +834,11 @@ public interface SiteToSiteClient extends Closeable { return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); } + @Override + public long getCacheExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(contentsCacheExpirationMillis, TimeUnit.MILLISECONDS); + } + @Override public long getIdleConnectionExpiration(final TimeUnit timeUnit) { return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 83e8328471..8da5e706b1 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -162,6 +162,17 @@ public interface SiteToSiteClientConfig extends Serializable { */ int getPreferredBatchCount(); + /** + * When the contents of a remote NiFi instance are fetched, that information is cached + * so that many calls that are made in a short period of time do not overwhelm the remote + * NiFi instance. This method will indicate the number of milliseconds that this information + * can be cached. + * + * @param unit the desired time unit + * @return the number of milliseconds that the contents of a remote NiFi instance will be cached + */ + long getCacheExpiration(TimeUnit unit); + /** * @return the EventReporter that is to be used by clients to report events */ diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index f0bdcf143e..4213dac67a 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -106,6 +106,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); final Collection peers = apiClient.getPeers(); @@ -154,6 +155,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr apiClient.setBaseUrl(peer.getUrl()); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); apiClient.setCompress(config.isUseCompression()); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index f4de3713e9..1670a532a7 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -76,8 +76,6 @@ import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.reporting.Severity; import org.apache.nifi.security.util.CertificateUtils; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.remote.PeerDTO; @@ -90,6 +88,9 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -110,8 +111,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -176,11 +180,14 @@ public class SiteToSiteRestApiClient implements Closeable { private int connectTimeoutMillis; private int readTimeoutMillis; + private long cacheExpirationMillis = 30000L; private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$"); private Future postResult; private CountDownLatch transferDataLatch = new CountDownLatch(1); + private static final ConcurrentMap contentsMap = new ConcurrentHashMap<>(); + private volatile long lastPruneTimestamp = System.currentTimeMillis(); public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) { this.sslContext = sslContext; @@ -268,7 +275,7 @@ public class SiteToSiteRestApiClient implements Closeable { final HttpClientBuilder clientBuilder = HttpClients.custom(); if (sslContext != null) { - clientBuilder.setSslcontext(sslContext); + clientBuilder.setSSLContext(sslContext); clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor()); } @@ -368,10 +375,48 @@ public class SiteToSiteRestApiClient implements Closeable { } private ControllerDTO getController() throws IOException { + // first check cache and prune any old values. + // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed + // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. + if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) { + pruneCache(); + } + + final String internedUrl = baseUrl.intern(); + synchronized (internedUrl) { + final RemoteGroupContents groupContents = contentsMap.get(internedUrl); + + if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(cacheExpirationMillis)) { + logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", internedUrl); + + final ControllerDTO refreshedContents; + try { + refreshedContents = fetchController(); + } catch (final Exception e) { + // we failed to refresh contents, but we don't want to constantly poll the remote instance, failing. + // So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp. + final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents(); + final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController); + contentsMap.put(internedUrl, updatedContents); + throw e; + } + + logger.debug("Successfully retrieved contents for remote group at URL {}", internedUrl); + + final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents); + contentsMap.put(internedUrl, updatedContents); + return refreshedContents; + } + + logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", internedUrl); + return groupContents.getContents(); + } + } + + private ControllerDTO fetchController() throws IOException { try { final HttpGet get = createGetControllerRequest(); return execute(get, ControllerEntity.class).getController(); - } catch (final HttpGetFailedException e) { if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) { logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url..."); @@ -382,6 +427,20 @@ public class SiteToSiteRestApiClient implements Closeable { } } + private void pruneCache() { + for (final Map.Entry entry : contentsMap.entrySet()) { + final String url = entry.getKey(); + final RemoteGroupContents contents = entry.getValue(); + + // If any entry in the map is more than 4 times as old as the refresh period, + // then we can go ahead and remove it from the map. We use 4 * refreshMillis + // just to ensure that we don't have any race condition with the above #getRemoteContents. + if (contents.isOlderThan(TimeUnit.MINUTES.toMillis(5))) { + contentsMap.remove(url, contents); + } + } + } + private HttpGet createGetControllerRequest() { final HttpGet get = createGet("/site-to-site"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); @@ -1211,6 +1270,10 @@ public class SiteToSiteRestApiClient implements Closeable { this.readTimeoutMillis = readTimeoutMillis; } + public void setCacheExpirationMillis(final long expirationMillis) { + this.cacheExpirationMillis = expirationMillis; + } + public static String getFirstUrl(final String clusterUrlStr) { if (clusterUrlStr == null) { return null; @@ -1452,4 +1515,22 @@ public class SiteToSiteRestApiClient implements Closeable { } + private static class RemoteGroupContents { + private final ControllerDTO contents; + private final long timestamp; + + public RemoteGroupContents(final ControllerDTO contents) { + this.contents = contents; + this.timestamp = System.currentTimeMillis(); + } + + public ControllerDTO getContents() { + return contents; + } + + public boolean isOlderThan(final long millis) { + final long millisSinceRefresh = System.currentTimeMillis() - timestamp; + return millisSinceRefresh > millis; + } + } } diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index b0e68d5b60..12a50a9464 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2964,6 +2964,9 @@ Remote Process Groups can choose transport protocol from RAW and HTTP. Propertie Whether a Site-to-Site client uses HTTP or HTTPS is determined by `nifi.remote.input.secure`. If it is set to `true`, then requests are sent as HTTPS to `nifi.web.https.port`. If set to `false`, HTTP requests are sent to `nifi.web.http.port`. |nifi.remote.input.http.transaction.ttl|Specifies how long a transaction can stay alive on the server. By default, it is set to `30 secs`. + If a Site-to-Site client hasn’t proceeded to the next action after this period of time, the transaction is discarded from the remote NiFi instance. For example, when a client creates a transaction but doesn’t send or receive flow files, or when a client sends or receives flow files but doesn’t confirm that transaction. +|nifi.remote.contents.cache.expiration|Specifies how long NiFi should cache information about a remote NiFi instance when communicating via Site-to-Site. By default, NiFi will cache the + +responses from the remote system for `30 secs`. This allows NiFi to avoid constantly making HTTP requests to the remote system, which is particularly important when this instance of NiFi + +has many instances of Remote Process Groups. |==== === Web Properties diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 49fd90fc8b..8689d7156b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -102,6 +102,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final ProcessScheduler scheduler; private final EventReporter eventReporter; private final NiFiProperties nifiProperties; + private final long remoteContentsCacheExpiration; private final AtomicReference name = new AtomicReference<>(); private final AtomicReference position = new AtomicReference<>(new Position(0D, 0D)); @@ -156,6 +157,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { this.scheduler = flowController.getProcessScheduler(); this.authorizationIssue = "Establishing connection to " + targetUris; + final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs"); + remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS); + final BulletinRepository bulletinRepository = flowController.getBulletinRepository(); eventReporter = new EventReporter() { private static final long serialVersionUID = 1L; @@ -963,7 +967,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(getLocalAddress()); - + apiClient.setCacheExpirationMillis(remoteContentsCacheExpiration); return apiClient; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index f867ed3ae1..624f2f1883 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -124,6 +124,7 @@ nifi.remote.input.secure=false nifi.remote.input.socket.port= nifi.remote.input.http.enabled=true nifi.remote.input.http.transaction.ttl=30 sec +nifi.remote.contents.cache.expiration=30 secs # web properties # nifi.web.war.directory=${nifi.web.war.directory}