diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 5f6e66cd5e..0f0f878769 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -20,6 +20,8 @@ import com.burgstaller.okhttp.AuthenticationCacheInterceptor; import com.burgstaller.okhttp.CachingAuthenticatorDecorator; import com.burgstaller.okhttp.digest.CachingAuthenticator; import com.burgstaller.okhttp.digest.DigestAuthenticator; +import com.google.common.io.Files; +import okhttp3.Cache; import okhttp3.Credentials; import okhttp3.MediaType; import okhttp3.OkHttpClient; @@ -48,6 +50,7 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -70,6 +73,8 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; + +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -388,6 +393,24 @@ public final class InvokeHTTP extends AbstractProcessor { .allowableValues("true", "false") .build(); + public static final PropertyDescriptor PROP_USE_ETAG = new PropertyDescriptor.Builder() + .name("use-etag") + .description("Enable HTTP entity tag (ETag) support for HTTP requests.") + .displayName("Use HTTP ETag") + .required(true) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_ETAG_MAX_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("etag-max-cache-size") + .description("The maximum size that the ETag cache should be allowed to grow to. The default size is 10MB.") + .displayName("Maximum ETag Cache Size") + .required(true) + .defaultValue("10MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, PROP_URL, @@ -413,7 +436,9 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_CONTENT_TYPE, PROP_SEND_BODY, PROP_USE_CHUNKED_ENCODING, - PROP_PENALIZE_NO_RETRY)); + PROP_PENALIZE_NO_RETRY, + PROP_USE_ETAG, + PROP_ETAG_MAX_CACHE_SIZE)); // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() @@ -559,6 +584,13 @@ public final class InvokeHTTP extends AbstractProcessor { isHttpsProxy = HTTPS.equals(proxyType); } + // configure ETag cache if enabled + final boolean etagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean(); + if(etagEnabled) { + final int maxCacheSizeBytes = context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue(); + okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes)); + } + // Set timeouts okHttpClientBuilder.connectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); okHttpClientBuilder.readTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); @@ -718,6 +750,14 @@ public final class InvokeHTTP extends AbstractProcessor { final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); final ComponentLog logger = getLogger(); + // log ETag cache metrics + final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean(); + if(eTagEnabled && logger.isDebugEnabled()) { + final Cache cache = okHttpClient.cache(); + logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}", + new Object[] {cache.requestCount(), cache.networkCount(), cache.hitCount()}); + } + // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute. final UUID txId = UUID.randomUUID(); @@ -1142,6 +1182,19 @@ public final class InvokeHTTP extends AbstractProcessor { return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8; } + /** + * Retrieve the directory in which OkHttp should cache responses. This method opts + * to use a temp directory to write the cache, which means that the cache will be written + * to a new location each time this processor is scheduled. + * + * Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching + * + * @return the directory in which the ETag cache should be written + */ + private static File getETagCacheDir() { + return Files.createTempDir(); + } + private static class OverrideHostnameVerifier implements HostnameVerifier { private final String trustedHostname; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index 6b233d9557..5242b8f9df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -1547,6 +1547,7 @@ public abstract class TestInvokeHttpCommon { response.setContentType("text/plain"); response.setContentLength(target.length()); + response.setHeader("Cache-Control", "public,max-age=1"); try (PrintWriter writer = response.getWriter()) { writer.print(target);