diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 52c9fdf4a0..ee28c0cd09 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -26,7 +26,10 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetAddress; import java.net.MalformedURLException; +import java.net.UnknownHostException; +import java.security.Principal; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -53,31 +56,31 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; +import javax.security.auth.x500.X500Principal; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response.Status; -import org.apache.commons.io.IOUtils; import org.apache.http.Header; import org.apache.http.HttpException; import org.apache.http.HttpResponse; import org.apache.http.HttpResponseInterceptor; +import org.apache.http.NoHttpResponseException; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.HttpClient; +import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; -import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.config.SocketConfig; import org.apache.http.conn.ManagedHttpClientConnection; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.conn.ssl.SSLContextBuilder; -import org.apache.http.conn.ssl.SSLContexts; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentProducer; import org.apache.http.entity.EntityTemplate; @@ -87,6 +90,8 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; import org.apache.http.util.EntityUtils; import org.apache.http.util.VersionInfo; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -133,7 +138,8 @@ import org.apache.nifi.util.StringUtils; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"http", "https", "remote", "copy", "archive"}) @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile. " - + "Uses a connection pool with max number of connections equal to its Concurrent Tasks configuration.") + + "Uses a connection pool with max number of connections equal to " + + "the number of possible endpoints multiplied by the Concurrent Tasks configuration.") public class PostHTTP extends AbstractProcessor { public static final String CONTENT_TYPE_HEADER = "Content-Type"; @@ -154,6 +160,7 @@ public class PostHTTP extends AbstractProcessor { public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version"; public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id"; public static final String PROTOCOL_VERSION = "3"; + public static final String REMOTE_DN = "remote.dn"; public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() .name("URL") @@ -267,9 +274,15 @@ public class PostHTTP extends AbstractProcessor { private Set relationships; private List properties; - private final AtomicReference acceptsRef = new AtomicReference<>(); private final AtomicReference throttlerRef = new AtomicReference<>(); - private final ConcurrentMap configMap = new ConcurrentHashMap<>(); + private final ConcurrentMap destinationAcceptsMap = new ConcurrentHashMap<>(); + private volatile PoolingHttpClientConnectionManager connManager; + private volatile CloseableHttpClient client; + private volatile RequestConfig requestConfig; + + // this is used when creating thet HttpContext, which is a thread local variable that is used by + // HTTPClient to obtain an available, reusable connection + private volatile Principal principal; @Override protected void init(final ProcessorInitializationContext context) { @@ -335,15 +348,15 @@ public class PostHTTP extends AbstractProcessor { @OnStopped public void onStopped() { - this.acceptsRef.set(null); + destinationAcceptsMap.clear(); - for (final Map.Entry entry : configMap.entrySet()) { - final Config config = entry.getValue(); - config.getConnectionManager().shutdown(); + try { + connManager.shutdown(); + client.close(); + } catch (IOException e) { + getLogger().error("Could not properly shutdown connections", e); } - configMap.clear(); - final StreamThrottler throttler = throttlerRef.getAndSet(null); if (throttler != null) { try { @@ -358,28 +371,18 @@ public class PostHTTP extends AbstractProcessor { public void onScheduled(final ProcessContext context) { final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue())); - } - private String getBaseUrl(final String url) { - final int index = url.indexOf("/", 9); - if (index < 0) { - return url; - } + String hostname = "unknown"; + try { + hostname = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException ignore) {} + principal = new X500Principal("CN=" + hostname + ", OU=unknown, O=unknown, C=unknown"); - return url.substring(0, index); - } - - private Config getConfig(final String url, final ProcessContext context) { - final String baseUrl = getBaseUrl(url); - Config config = configMap.get(baseUrl); - if (config != null) { - return config; - } - - final PoolingHttpClientConnectionManager conMan; + // setup the PoolingHttpClientConnectionManager final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); if (sslContextService == null) { - conMan = new PoolingHttpClientConnectionManager(); + connManager = new PoolingHttpClientConnectionManager(); + } else { final SSLContext sslContext; try { @@ -397,15 +400,110 @@ public class PostHTTP extends AbstractProcessor { .register("http", PlainConnectionSocketFactory.getSocketFactory()) .build(); - conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); } - conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks()); - conMan.setMaxTotal(context.getMaxConcurrentTasks()); - config = new Config(conMan); - final Config existingConfig = configMap.putIfAbsent(baseUrl, config); + // setup SocketConfig + SocketConfig.Builder socketConfigBuilder = SocketConfig.custom(); + socketConfigBuilder.setSoTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + SocketConfig socketConfig = socketConfigBuilder.build(); + connManager.setDefaultSocketConfig(socketConfig); - return existingConfig == null ? config : existingConfig; + // the +1 here accommodates math error calculating excess connections in AbstractConnPool.getPoolEntryBlocking() + connManager.setDefaultMaxPerRoute(context.getMaxConcurrentTasks() + 1); + // max total connections will get set in onTrigger(), because a new route will require increasing this + connManager.setMaxTotal(1); + // enable inactivity check, to detect and close idle connections + connManager.setValidateAfterInactivity(30_000); + + // setup the HttpClientBuilder + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + clientBuilder.setConnectionManager(connManager); + clientBuilder.setUserAgent(context.getProperty(USER_AGENT).getValue()); + clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { + @Override + public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); + final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); + if (!conn.isOpen()) { + return; + } + + final SSLSession sslSession = conn.getSSLSession(); + + if (sslSession != null) { + final Certificate[] certChain = sslSession.getPeerCertificates(); + if (certChain == null || certChain.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } + + try { + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); + httpContext.setAttribute(REMOTE_DN, cert.getSubjectDN().getName().trim()); + } catch (CertificateException e) { + final String msg = "Could not extract subject DN from SSL session peer certificate"; + getLogger().warn(msg); + throw new SSLPeerUnverifiedException(msg); + } + } + } + }); + + HttpRequestRetryHandler retryHandler = (exception, attempt, httpContext) -> { + if (attempt > 3 || !isScheduled()) { + return false; + } + final HttpClientContext clientContext = HttpClientContext.adapt(httpContext); + // A heavily loaded remote listener can manifest as NoHttpResponseExceptions here. + // When this happens, take a 5 second snooze before retrying to give the remote a short break. + if (exception instanceof NoHttpResponseException) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Sleeping for 5 secs then retrying {} request for remote server {}", + new Object[]{clientContext.getRequest().getRequestLine().getMethod(), clientContext.getTargetHost()}); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + return false; + } + return true; + } + // do not retry more serious exceptions + return false; + }; + clientBuilder.setRetryHandler(retryHandler); + clientBuilder.disableContentCompression(); + + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + // set the credentials if appropriate + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + if (username != null) { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + } + + // Set the proxy if specified + HTTPUtils.setProxy(context, clientBuilder, credentialsProvider); + + // complete the HTTPClient build + client = clientBuilder.build(); + + // setup RequestConfig + final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setRedirectsEnabled(false); + requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfig = requestConfigBuilder.build(); + } + + private String getBaseUrl(final String url) { + final int index = url.indexOf("/", 9); + if (index < 0) { + return url; + } + return url.substring(0, index); } private SSLContext createSSLContext(final SSLContextService service) @@ -427,9 +525,14 @@ public class PostHTTP extends AbstractProcessor { keystore.load(in, service.getKeyStorePassword().toCharArray()); } builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()); + final String alias = keystore.aliases().nextElement(); + final Certificate cert = keystore.getCertificate(alias); + if (cert instanceof X509Certificate) { + principal = ((X509Certificate) cert).getSubjectDN(); + } } - builder = builder.useProtocol(service.getSslAlgorithm()); + builder = builder.setProtocol(service.getSslAlgorithm()); final SSLContext sslContext = builder.build(); return sslContext; @@ -459,14 +562,6 @@ public class PostHTTP extends AbstractProcessor { final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); - final String userAgent = context.getProperty(USER_AGENT).getValue(); - - final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); - requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - requestConfigBuilder.setRedirectsEnabled(false); - requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - final RequestConfig requestConfig = requestConfigBuilder.build(); final StreamThrottler throttler = throttlerRef.get(); @@ -474,75 +569,25 @@ public class PostHTTP extends AbstractProcessor { final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize()); DestinationAccepts destinationAccepts = null; - CloseableHttpClient client = null; final String transactionId = UUID.randomUUID().toString(); - - final AtomicReference dnHolder = new AtomicReference<>("none"); - - final Config config = getConfig(url, context); - final HttpClientConnectionManager conMan = config.getConnectionManager(); - - final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); - clientBuilder.setConnectionManager(conMan); - clientBuilder.setUserAgent(userAgent); - clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { - @Override - public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); - final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); - if (!conn.isOpen()) { - return; - } - - final SSLSession sslSession = conn.getSSLSession(); - - if (sslSession != null) { - final Certificate[] certChain = sslSession.getPeerCertificates(); - if (certChain == null || certChain.length == 0) { - throw new SSLPeerUnverifiedException("No certificates found"); - } - - try { - final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); - dnHolder.set(cert.getSubjectDN().getName().trim()); - } catch (CertificateException e) { - final String msg = "Could not extract subject DN from SSL session peer certificate"; - logger.warn(msg); - throw new SSLPeerUnverifiedException(msg); - } - } - } - }); - - clientBuilder.disableAutomaticRetries(); - clientBuilder.disableContentCompression(); - - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); - // set the credentials if appropriate - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - clientBuilder.setDefaultCredentialsProvider(credentialsProvider); - if (username != null) { - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - } - - // Set the proxy if specified - HTTPUtils.setProxy(context, clientBuilder, credentialsProvider); - - client = clientBuilder.build(); + final HttpClientContext httpClientContext = HttpClientContext.create(); + httpClientContext.setUserToken(principal); // determine whether or not destination accepts flowfile/gzip - destinationAccepts = config.getDestinationAccepts(); + final String baseUrl = getBaseUrl(url); + destinationAccepts = destinationAcceptsMap.get(baseUrl); if (destinationAccepts == null) { try { - destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId); - config.setDestinationAccepts(destinationAccepts); + destinationAccepts = getDestinationAcceptance(sendAsFlowFile, url, transactionId, httpClientContext); + if (null == destinationAcceptsMap.putIfAbsent(baseUrl, destinationAccepts)) { + // url indicates a new route, so increase the max allowed open connections + connManager.setMaxTotal(connManager.getMaxTotal() + connManager.getDefaultMaxPerRoute()); + } } catch (final IOException e) { firstFlowFile = session.penalize(firstFlowFile); session.transfer(firstFlowFile, REL_FAILURE); logger.error("Unable to communicate with destination {} to determine whether or not it can accept " + "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e}); - context.yield(); return; } } @@ -583,23 +628,25 @@ public class PostHTTP extends AbstractProcessor { } try (final OutputStream out = wrappedOut) { + final FlowFilePackager packager; + if (!sendAsFlowFile) { + packager = null; + } else if (accepts.isFlowFileV3Accepted()) { + packager = new FlowFilePackagerV3(); + } else if (accepts.isFlowFileV2Accepted()) { + packager = new FlowFilePackagerV2(); + } else if (accepts.isFlowFileV1Accepted()) { + packager = new FlowFilePackagerV1(); + } else { + packager = null; + } + for (final FlowFile flowFile : toSend) { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { - FlowFilePackager packager = null; - if (!sendAsFlowFile) { - packager = null; - } else if (accepts.isFlowFileV3Accepted()) { - packager = new FlowFilePackagerV3(); - } else if (accepts.isFlowFileV2Accepted()) { - packager = new FlowFilePackagerV2(); - } else if (accepts.isFlowFileV1Accepted()) { - packager = new FlowFilePackagerV1(); - } - // if none of the above conditions is met, we should never get here, because // we will have already verified that at least 1 of the FlowFile packaging // formats is acceptable if sending as FlowFile. @@ -625,6 +672,15 @@ public class PostHTTP extends AbstractProcessor { } out.flush(); + } catch (ProcessException pe) { + // Pull out IOExceptions so that HTTPClient can properly do what it needs to do + Throwable t = pe.getCause(); + if (t != null && t instanceof IOException) { + IOException ioe = new IOException(t.getMessage()); + ioe.setStackTrace(t.getStackTrace()); + throw ioe; + } + throw pe; } } }) { @@ -639,6 +695,8 @@ public class PostHTTP extends AbstractProcessor { } }; + final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles"; + if (context.getProperty(CHUNKED_ENCODING).isSet()) { entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); } @@ -654,11 +712,13 @@ public class PostHTTP extends AbstractProcessor { } else if (accepts.isFlowFileV1Accepted()) { contentType = APPLICATION_FLOW_FILE_V1; } else { - logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is " - + "configured to deliver FlowFiles; rolling back session", new Object[]{url}); - session.rollback(); - context.yield(); - IOUtils.closeQuietly(client); + logger.error("Cannot send {} to {} because the destination does not accept FlowFiles and this processor is " + + "configured to deliver FlowFiles; routing to failure", + new Object[] {flowFileDescription, url}); + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } return; } } else { @@ -692,24 +752,17 @@ public class PostHTTP extends AbstractProcessor { } // Do the actual POST - final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles"; - final String uploadDataRate; final long uploadMillis; CloseableHttpResponse response = null; try { final StopWatch stopWatch = new StopWatch(true); - response = client.execute(post); - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - EntityUtils.consume(response.getEntity()); + response = client.execute(post, httpClientContext); stopWatch.stop(); uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get()); uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - } catch (final IOException e) { + } catch (final IOException | ProcessException e) { logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e}); - context.yield(); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); @@ -718,9 +771,10 @@ public class PostHTTP extends AbstractProcessor { } finally { if (response != null) { try { - response.close(); - } catch (final IOException e) { - getLogger().warn("Failed to close HTTP Response due to {}", new Object[]{e}); + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + EntityUtils.consume(response.getEntity()); + } catch (final IOException ignore) { } } } @@ -744,10 +798,10 @@ public class PostHTTP extends AbstractProcessor { } if (holdUri == null) { + logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI", + new Object[]{flowFileDescription, url, responseCode, responseReason}); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI", - new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } return; @@ -756,22 +810,20 @@ public class PostHTTP extends AbstractProcessor { if (holdUri == null) { if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) { + logger.error("Failed to Post {} to {}: response code was {}:{}", + new Object[]{flowFileDescription, url, responseCode, responseReason}); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, " - + "since the destination is temporarily unavailable", - new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } - context.yield(); return; } if (responseCode >= 300) { + logger.error("Failed to Post {} to {}: response code was {}:{}", + new Object[]{flowFileDescription, url, responseCode, responseReason}); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: response code was {}:{}", - new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } return; @@ -781,7 +833,7 @@ public class PostHTTP extends AbstractProcessor { new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate}); for (final FlowFile flowFile : toSend) { - session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + dnHolder.get(), uploadMillis, true); + session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true); session.transfer(flowFile, REL_SUCCESS); } return; @@ -815,54 +867,58 @@ public class PostHTTP extends AbstractProcessor { final HttpDelete delete = new HttpDelete(fullHoldUri); delete.setHeader(TRANSACTION_ID_HEADER, transactionId); + delete.setConfig(requestConfig); - while (true) { - try { - final HttpResponse holdResponse = client.execute(delete); - EntityUtils.consume(holdResponse.getEntity()); - final int holdStatusCode = holdResponse.getStatusLine().getStatusCode(); - final String holdReason = holdResponse.getStatusLine().getReasonPhrase(); - if (holdStatusCode >= 300) { - logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure", - new Object[]{flowFileDescription, holdStatusCode, holdReason}); + HttpResponse holdResponse = null; + try { + holdResponse = client.execute(delete, httpClientContext); + final int holdStatusCode = holdResponse.getStatusLine().getStatusCode(); + final String holdReason = holdResponse.getStatusLine().getReasonPhrase(); + if (holdStatusCode >= 300) { + logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure", + new Object[]{flowFileDescription, holdStatusCode, holdReason}); - for (FlowFile flowFile : toSend) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - return; - } - - logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate}); - - for (final FlowFile flowFile : toSend) { - session.getProvenanceReporter().send(flowFile, url); - session.transfer(flowFile, REL_SUCCESS); - } - return; - } catch (final IOException e) { - logger.warn("Failed to delete Hold that destination placed on {} due to {}", new Object[]{flowFileDescription, e}); - } - - if (!isScheduled()) { - context.yield(); - logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription}); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } return; } + + logger.info("Successfully Posted {} to {} in {} at a rate of {}", + new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate}); + + for (final FlowFile flowFile : toSend) { + session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true); + session.transfer(flowFile, REL_SUCCESS); + } + return; + + } catch (final IOException e) { + logger.warn("Failed to delete Hold that destination placed on {} due to {}; routing to failure", new Object[]{flowFileDescription, e}); + for (FlowFile flowFile : toSend) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } finally { + if (null != holdResponse) { + try { + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + EntityUtils.consume(holdResponse.getEntity()); + } catch (IOException ignore) {} + } } } - private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final HttpClient client, final String uri, - final ComponentLog logger, final String transactionId) throws IOException { + private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final String uri, final String transactionId, final HttpContext httpContext) throws IOException { final HttpHead head = new HttpHead(uri); + head.setConfig(requestConfig); if (sendAsFlowFile) { head.addHeader(TRANSACTION_ID_HEADER, transactionId); } - final HttpResponse response = client.execute(head); + + final HttpResponse response = client.execute(head, httpContext); // we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile // otherwise, we do not bother to make any determinations concerning this compatibility @@ -901,12 +957,14 @@ public class PostHTTP extends AbstractProcessor { } } - if (acceptsFlowFileV3) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile"); - } else if (acceptsFlowFileV2) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile"); - } else if (acceptsFlowFileV1) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile"); + if (getLogger().isDebugEnabled()) { + if (acceptsFlowFileV3) { + getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV2) { + getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV1) { + getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile"); + } } } @@ -921,15 +979,17 @@ public class PostHTTP extends AbstractProcessor { } } - if (acceptsGzip) { - logger.debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported"); - } else { - logger.debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression"); + if (getLogger().isDebugEnabled()) { + if (acceptsGzip) { + getLogger().debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported"); + } else { + getLogger().debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression"); + } } return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion); } else { - logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of " + getLogger().warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of " + statusCode + ": " + response.getStatusLine().getReasonPhrase()); return new DestinationAccepts(false, false, false, false, null); } @@ -971,26 +1031,4 @@ public class PostHTTP extends AbstractProcessor { return protocolVersion; } } - - private static class Config { - - private volatile DestinationAccepts destinationAccepts; - private final HttpClientConnectionManager conMan; - - public Config(final HttpClientConnectionManager conMan) { - this.conMan = conMan; - } - - public DestinationAccepts getDestinationAccepts() { - return this.destinationAccepts; - } - - public void setDestinationAccepts(final DestinationAccepts destinationAccepts) { - this.destinationAccepts = destinationAccepts; - } - - public HttpClientConnectionManager getConnectionManager() { - return conMan; - } - } }