NIFI-4598: When we retrieve the 'controller' from a remote NiFi instance in order to determine which ports are available, cache those results for up to some configurable amount of time (default 30 secs) so that we don't constantly issue HTTP Requests to the remote nifi

This closes #2270.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-11-13 13:17:18 -05:00 committed by Bryan Bende
parent c7a5a09b8a
commit af3a578711
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
11 changed files with 139 additions and 6 deletions

View File

@ -73,6 +73,7 @@ public abstract class NiFiProperties {
public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure";
public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled";
public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl";
public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration";
public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";

View File

@ -32,6 +32,7 @@ public abstract class AbstractSiteToSiteClient implements SiteToSiteClient {
siteInfoProvider.setSslContext(config.getSslContext());
siteInfoProvider.setConnectTimeoutMillis(commsTimeout);
siteInfoProvider.setReadTimeoutMillis(commsTimeout);
siteInfoProvider.setCachedContentsExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
siteInfoProvider.setProxy(config.getHttpProxy());
siteInfoProvider.setLocalAddress(config.getLocalAddress());
}

View File

@ -22,10 +22,10 @@ import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;

View File

@ -59,6 +59,7 @@ public class SiteInfoProvider {
private SSLContext sslContext;
private int connectTimeoutMillis;
private int readTimeoutMillis;
private long cachedContentExpirationMillis = TimeUnit.SECONDS.toMillis(30L);
private ControllerDTO refreshRemoteInfo() throws IOException {
@ -104,6 +105,7 @@ public class SiteInfoProvider {
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
apiClient.setLocalAddress(localAddress);
apiClient.setCacheExpirationMillis(cachedContentExpirationMillis);
return apiClient;
}
@ -272,6 +274,10 @@ public class SiteInfoProvider {
this.readTimeoutMillis = readTimeoutMillis;
}
public void setCachedContentsExpirationMillis(long expirationMillis) {
this.cachedContentExpirationMillis = expirationMillis;
}
public void setProxy(HttpProxy proxy) {
this.proxy = proxy;
}

View File

@ -154,6 +154,7 @@ public interface SiteToSiteClient extends Closeable {
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
private long contentsCacheExpirationMillis = TimeUnit.SECONDS.toMillis(30L);
private SSLContext sslContext;
private String keystoreFilename;
private String keystorePass;
@ -184,6 +185,7 @@ public interface SiteToSiteClient extends Closeable {
this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
this.contentsCacheExpirationMillis = config.getCacheExpiration(TimeUnit.MILLISECONDS);
this.sslContext = config.getSslContext();
this.keystoreFilename = config.getKeystoreFilename();
this.keystorePass = config.getKeystorePassword();
@ -273,6 +275,19 @@ public interface SiteToSiteClient extends Closeable {
return this;
}
/**
* Specifies how long the contents of a remote NiFi instance should be cached before making
* another web request to the remote instance.
*
* @param expirationPeriod the amount of time that an entry in the cache should expire
* @param unit unit of time over which to interpret the given expirationPeriod
* @return the builder
*/
public Builder cacheExpiration(final long expirationPeriod, final TimeUnit unit) {
this.contentsCacheExpirationMillis = unit.toMillis(expirationPeriod);
return this;
}
/**
* Specifies the amount of time that a connection can remain idle in the
* connection pool before it is "expired" and shutdown. The default
@ -722,6 +737,7 @@ public interface SiteToSiteClient extends Closeable {
private final long timeoutNanos;
private final long penalizationNanos;
private final long idleExpirationNanos;
private final long contentsCacheExpirationMillis;
private final SSLContext sslContext;
private final String keystoreFilename;
private final String keystorePass;
@ -746,6 +762,7 @@ public interface SiteToSiteClient extends Closeable {
this.timeoutNanos = 0;
this.penalizationNanos = 0;
this.idleExpirationNanos = 0;
this.contentsCacheExpirationMillis = 30000L;
this.sslContext = null;
this.keystoreFilename = null;
this.keystorePass = null;
@ -773,6 +790,7 @@ public interface SiteToSiteClient extends Closeable {
this.timeoutNanos = builder.timeoutNanos;
this.penalizationNanos = builder.penalizationNanos;
this.idleExpirationNanos = builder.idleExpirationNanos;
this.contentsCacheExpirationMillis = builder.contentsCacheExpirationMillis;
this.sslContext = builder.sslContext;
this.keystoreFilename = builder.keystoreFilename;
this.keystorePass = builder.keystorePass;
@ -816,6 +834,11 @@ public interface SiteToSiteClient extends Closeable {
return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
}
@Override
public long getCacheExpiration(final TimeUnit timeUnit) {
return timeUnit.convert(contentsCacheExpirationMillis, TimeUnit.MILLISECONDS);
}
@Override
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);

View File

@ -162,6 +162,17 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
int getPreferredBatchCount();
/**
* When the contents of a remote NiFi instance are fetched, that information is cached
* so that many calls that are made in a short period of time do not overwhelm the remote
* NiFi instance. This method will indicate the number of milliseconds that this information
* can be cached.
*
* @param unit the desired time unit
* @return the number of milliseconds that the contents of a remote NiFi instance will be cached
*/
long getCacheExpiration(TimeUnit unit);
/**
* @return the EventReporter that is to be used by clients to report events
*/

View File

@ -106,6 +106,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
apiClient.setConnectTimeoutMillis(timeoutMillis);
apiClient.setReadTimeoutMillis(timeoutMillis);
apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
apiClient.setLocalAddress(config.getLocalAddress());
final Collection<PeerDTO> peers = apiClient.getPeers();
@ -154,6 +155,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
apiClient.setBaseUrl(peer.getUrl());
apiClient.setConnectTimeoutMillis(timeoutMillis);
apiClient.setReadTimeoutMillis(timeoutMillis);
apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
apiClient.setLocalAddress(config.getLocalAddress());
apiClient.setCompress(config.isUseCompression());

View File

@ -76,8 +76,6 @@ import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
@ -90,6 +88,9 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@ -110,8 +111,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -176,11 +180,14 @@ public class SiteToSiteRestApiClient implements Closeable {
private int connectTimeoutMillis;
private int readTimeoutMillis;
private long cacheExpirationMillis = 30000L;
private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$");
private Future<HttpResponse> postResult;
private CountDownLatch transferDataLatch = new CountDownLatch(1);
private static final ConcurrentMap<String, RemoteGroupContents> contentsMap = new ConcurrentHashMap<>();
private volatile long lastPruneTimestamp = System.currentTimeMillis();
public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) {
this.sslContext = sslContext;
@ -268,7 +275,7 @@ public class SiteToSiteRestApiClient implements Closeable {
final HttpClientBuilder clientBuilder = HttpClients.custom();
if (sslContext != null) {
clientBuilder.setSslcontext(sslContext);
clientBuilder.setSSLContext(sslContext);
clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
}
@ -368,10 +375,48 @@ public class SiteToSiteRestApiClient implements Closeable {
}
private ControllerDTO getController() throws IOException {
// first check cache and prune any old values.
// Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed
// from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem.
if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) {
pruneCache();
}
final String internedUrl = baseUrl.intern();
synchronized (internedUrl) {
final RemoteGroupContents groupContents = contentsMap.get(internedUrl);
if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(cacheExpirationMillis)) {
logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", internedUrl);
final ControllerDTO refreshedContents;
try {
refreshedContents = fetchController();
} catch (final Exception e) {
// we failed to refresh contents, but we don't want to constantly poll the remote instance, failing.
// So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp.
final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents();
final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController);
contentsMap.put(internedUrl, updatedContents);
throw e;
}
logger.debug("Successfully retrieved contents for remote group at URL {}", internedUrl);
final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents);
contentsMap.put(internedUrl, updatedContents);
return refreshedContents;
}
logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", internedUrl);
return groupContents.getContents();
}
}
private ControllerDTO fetchController() throws IOException {
try {
final HttpGet get = createGetControllerRequest();
return execute(get, ControllerEntity.class).getController();
} catch (final HttpGetFailedException e) {
if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) {
logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url...");
@ -382,6 +427,20 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
private void pruneCache() {
for (final Map.Entry<String, RemoteGroupContents> entry : contentsMap.entrySet()) {
final String url = entry.getKey();
final RemoteGroupContents contents = entry.getValue();
// If any entry in the map is more than 4 times as old as the refresh period,
// then we can go ahead and remove it from the map. We use 4 * refreshMillis
// just to ensure that we don't have any race condition with the above #getRemoteContents.
if (contents.isOlderThan(TimeUnit.MINUTES.toMillis(5))) {
contentsMap.remove(url, contents);
}
}
}
private HttpGet createGetControllerRequest() {
final HttpGet get = createGet("/site-to-site");
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
@ -1211,6 +1270,10 @@ public class SiteToSiteRestApiClient implements Closeable {
this.readTimeoutMillis = readTimeoutMillis;
}
public void setCacheExpirationMillis(final long expirationMillis) {
this.cacheExpirationMillis = expirationMillis;
}
public static String getFirstUrl(final String clusterUrlStr) {
if (clusterUrlStr == null) {
return null;
@ -1452,4 +1515,22 @@ public class SiteToSiteRestApiClient implements Closeable {
}
private static class RemoteGroupContents {
private final ControllerDTO contents;
private final long timestamp;
public RemoteGroupContents(final ControllerDTO contents) {
this.contents = contents;
this.timestamp = System.currentTimeMillis();
}
public ControllerDTO getContents() {
return contents;
}
public boolean isOlderThan(final long millis) {
final long millisSinceRefresh = System.currentTimeMillis() - timestamp;
return millisSinceRefresh > millis;
}
}
}

View File

@ -2964,6 +2964,9 @@ Remote Process Groups can choose transport protocol from RAW and HTTP. Propertie
Whether a Site-to-Site client uses HTTP or HTTPS is determined by `nifi.remote.input.secure`. If it is set to `true`, then requests are sent as HTTPS to `nifi.web.https.port`. If set to `false`, HTTP requests are sent to `nifi.web.http.port`.
|nifi.remote.input.http.transaction.ttl|Specifies how long a transaction can stay alive on the server. By default, it is set to `30 secs`. +
If a Site-to-Site client hasnt proceeded to the next action after this period of time, the transaction is discarded from the remote NiFi instance. For example, when a client creates a transaction but doesnt send or receive flow files, or when a client sends or receives flow files but doesnt confirm that transaction.
|nifi.remote.contents.cache.expiration|Specifies how long NiFi should cache information about a remote NiFi instance when communicating via Site-to-Site. By default, NiFi will cache the +
responses from the remote system for `30 secs`. This allows NiFi to avoid constantly making HTTP requests to the remote system, which is particularly important when this instance of NiFi +
has many instances of Remote Process Groups.
|====
=== Web Properties

View File

@ -102,6 +102,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ProcessScheduler scheduler;
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
private final long remoteContentsCacheExpiration;
private final AtomicReference<String> name = new AtomicReference<>();
private final AtomicReference<Position> position = new AtomicReference<>(new Position(0D, 0D));
@ -156,6 +157,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
this.scheduler = flowController.getProcessScheduler();
this.authorizationIssue = "Establishing connection to " + targetUris;
final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs");
remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS);
final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@ -963,7 +967,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setLocalAddress(getLocalAddress());
apiClient.setCacheExpirationMillis(remoteContentsCacheExpiration);
return apiClient;
}

View File

@ -124,6 +124,7 @@ nifi.remote.input.secure=false
nifi.remote.input.socket.port=
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs
# web properties #
nifi.web.war.directory=${nifi.web.war.directory}