NIFI-3402 - Added etag support to InvokeHTTP

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2150.
This commit is contained in:
m-hogue 2017-09-12 14:51:19 -04:00 committed by Pierre Villard
parent 7aabb99bcc
commit 6471f66acd
2 changed files with 55 additions and 1 deletions

View File

@ -20,6 +20,8 @@ import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;
import okhttp3.Cache;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@ -48,6 +50,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -70,6 +73,8 @@ import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -388,6 +393,24 @@ public final class InvokeHTTP extends AbstractProcessor {
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_USE_ETAG = new PropertyDescriptor.Builder()
.name("use-etag")
.description("Enable HTTP entity tag (ETag) support for HTTP requests.")
.displayName("Use HTTP ETag")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_ETAG_MAX_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("etag-max-cache-size")
.description("The maximum size that the ETag cache should be allowed to grow to. The default size is 10MB.")
.displayName("Maximum ETag Cache Size")
.required(true)
.defaultValue("10MB")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
@ -413,7 +436,9 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_CONTENT_TYPE,
PROP_SEND_BODY,
PROP_USE_CHUNKED_ENCODING,
PROP_PENALIZE_NO_RETRY));
PROP_PENALIZE_NO_RETRY,
PROP_USE_ETAG,
PROP_ETAG_MAX_CACHE_SIZE));
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
@ -559,6 +584,13 @@ public final class InvokeHTTP extends AbstractProcessor {
isHttpsProxy = HTTPS.equals(proxyType);
}
// configure ETag cache if enabled
final boolean etagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
if(etagEnabled) {
final int maxCacheSizeBytes = context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue();
okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes));
}
// Set timeouts
okHttpClientBuilder.connectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
@ -718,6 +750,14 @@ public final class InvokeHTTP extends AbstractProcessor {
final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
final ComponentLog logger = getLogger();
// log ETag cache metrics
final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
if(eTagEnabled && logger.isDebugEnabled()) {
final Cache cache = okHttpClient.cache();
logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}",
new Object[] {cache.requestCount(), cache.networkCount(), cache.hitCount()});
}
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
final UUID txId = UUID.randomUUID();
@ -1142,6 +1182,19 @@ public final class InvokeHTTP extends AbstractProcessor {
return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8;
}
/**
* Retrieve the directory in which OkHttp should cache responses. This method opts
* to use a temp directory to write the cache, which means that the cache will be written
* to a new location each time this processor is scheduled.
*
* Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching
*
* @return the directory in which the ETag cache should be written
*/
private static File getETagCacheDir() {
return Files.createTempDir();
}
private static class OverrideHostnameVerifier implements HostnameVerifier {
private final String trustedHostname;

View File

@ -1547,6 +1547,7 @@ public abstract class TestInvokeHttpCommon {
response.setContentType("text/plain");
response.setContentLength(target.length());
response.setHeader("Cache-Control", "public,max-age=1");
try (PrintWriter writer = response.getWriter()) {
writer.print(target);