mirror of https://github.com/apache/nifi.git
NIFI-5275 PostHTTP SocketConfig setup, fixed connection pool when using HTTPS, setup idle connection checker, setup request retry handler, improved some exception handling and logging, and NIFI-1336 fix as well
This closes #2796. Signed-off-by: Brandon Devries <devriesb@apache.org>
This commit is contained in:
parent
cfc858c901
commit
8c0705cb6b
|
@ -26,7 +26,10 @@ import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.Principal;
|
||||||
import java.security.KeyManagementException;
|
import java.security.KeyManagementException;
|
||||||
import java.security.KeyStore;
|
import java.security.KeyStore;
|
||||||
import java.security.KeyStoreException;
|
import java.security.KeyStoreException;
|
||||||
|
@ -53,31 +56,31 @@ import java.util.regex.Pattern;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||||
import javax.net.ssl.SSLSession;
|
import javax.net.ssl.SSLSession;
|
||||||
|
import javax.security.auth.x500.X500Principal;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.http.Header;
|
import org.apache.http.Header;
|
||||||
import org.apache.http.HttpException;
|
import org.apache.http.HttpException;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
import org.apache.http.HttpResponseInterceptor;
|
import org.apache.http.HttpResponseInterceptor;
|
||||||
|
import org.apache.http.NoHttpResponseException;
|
||||||
import org.apache.http.auth.AuthScope;
|
import org.apache.http.auth.AuthScope;
|
||||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||||
import org.apache.http.client.CredentialsProvider;
|
import org.apache.http.client.CredentialsProvider;
|
||||||
import org.apache.http.client.HttpClient;
|
import org.apache.http.client.HttpRequestRetryHandler;
|
||||||
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpDelete;
|
import org.apache.http.client.methods.HttpDelete;
|
||||||
import org.apache.http.client.methods.HttpHead;
|
import org.apache.http.client.methods.HttpHead;
|
||||||
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
import org.apache.http.client.protocol.HttpClientContext;
|
||||||
import org.apache.http.config.Registry;
|
import org.apache.http.config.Registry;
|
||||||
import org.apache.http.config.RegistryBuilder;
|
import org.apache.http.config.RegistryBuilder;
|
||||||
import org.apache.http.conn.HttpClientConnectionManager;
|
import org.apache.http.config.SocketConfig;
|
||||||
import org.apache.http.conn.ManagedHttpClientConnection;
|
import org.apache.http.conn.ManagedHttpClientConnection;
|
||||||
import org.apache.http.conn.socket.ConnectionSocketFactory;
|
import org.apache.http.conn.socket.ConnectionSocketFactory;
|
||||||
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
|
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
|
||||||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||||
import org.apache.http.conn.ssl.SSLContextBuilder;
|
|
||||||
import org.apache.http.conn.ssl.SSLContexts;
|
|
||||||
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
|
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
|
||||||
import org.apache.http.entity.ContentProducer;
|
import org.apache.http.entity.ContentProducer;
|
||||||
import org.apache.http.entity.EntityTemplate;
|
import org.apache.http.entity.EntityTemplate;
|
||||||
|
@ -87,6 +90,8 @@ import org.apache.http.impl.client.HttpClientBuilder;
|
||||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||||
import org.apache.http.protocol.HttpContext;
|
import org.apache.http.protocol.HttpContext;
|
||||||
import org.apache.http.protocol.HttpCoreContext;
|
import org.apache.http.protocol.HttpCoreContext;
|
||||||
|
import org.apache.http.ssl.SSLContextBuilder;
|
||||||
|
import org.apache.http.ssl.SSLContexts;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.apache.http.util.VersionInfo;
|
import org.apache.http.util.VersionInfo;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
@ -133,7 +138,8 @@ import org.apache.nifi.util.StringUtils;
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"http", "https", "remote", "copy", "archive"})
|
@Tags({"http", "https", "remote", "copy", "archive"})
|
||||||
@CapabilityDescription("Performs an HTTP Post with the content of the FlowFile. "
|
@CapabilityDescription("Performs an HTTP Post with the content of the FlowFile. "
|
||||||
+ "Uses a connection pool with max number of connections equal to its Concurrent Tasks configuration.")
|
+ "Uses a connection pool with max number of connections equal to "
|
||||||
|
+ "the number of possible endpoints multiplied by the Concurrent Tasks configuration.")
|
||||||
public class PostHTTP extends AbstractProcessor {
|
public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
public static final String CONTENT_TYPE_HEADER = "Content-Type";
|
public static final String CONTENT_TYPE_HEADER = "Content-Type";
|
||||||
|
@ -154,6 +160,7 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
|
public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
|
||||||
public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
|
public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
|
||||||
public static final String PROTOCOL_VERSION = "3";
|
public static final String PROTOCOL_VERSION = "3";
|
||||||
|
public static final String REMOTE_DN = "remote.dn";
|
||||||
|
|
||||||
public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
|
||||||
.name("URL")
|
.name("URL")
|
||||||
|
@ -267,9 +274,15 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
private final AtomicReference<DestinationAccepts> acceptsRef = new AtomicReference<>();
|
|
||||||
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
|
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
|
||||||
private final ConcurrentMap<String, Config> configMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, DestinationAccepts> destinationAcceptsMap = new ConcurrentHashMap<>();
|
||||||
|
private volatile PoolingHttpClientConnectionManager connManager;
|
||||||
|
private volatile CloseableHttpClient client;
|
||||||
|
private volatile RequestConfig requestConfig;
|
||||||
|
|
||||||
|
// this is used when creating thet HttpContext, which is a thread local variable that is used by
|
||||||
|
// HTTPClient to obtain an available, reusable connection
|
||||||
|
private volatile Principal principal;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
@ -335,15 +348,15 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
@OnStopped
|
@OnStopped
|
||||||
public void onStopped() {
|
public void onStopped() {
|
||||||
this.acceptsRef.set(null);
|
destinationAcceptsMap.clear();
|
||||||
|
|
||||||
for (final Map.Entry<String, Config> entry : configMap.entrySet()) {
|
try {
|
||||||
final Config config = entry.getValue();
|
connManager.shutdown();
|
||||||
config.getConnectionManager().shutdown();
|
client.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
getLogger().error("Could not properly shutdown connections", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
configMap.clear();
|
|
||||||
|
|
||||||
final StreamThrottler throttler = throttlerRef.getAndSet(null);
|
final StreamThrottler throttler = throttlerRef.getAndSet(null);
|
||||||
if (throttler != null) {
|
if (throttler != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -358,28 +371,18 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
public void onScheduled(final ProcessContext context) {
|
public void onScheduled(final ProcessContext context) {
|
||||||
final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
|
final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
|
||||||
this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue()));
|
this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue()));
|
||||||
}
|
|
||||||
|
|
||||||
private String getBaseUrl(final String url) {
|
String hostname = "unknown";
|
||||||
final int index = url.indexOf("/", 9);
|
try {
|
||||||
if (index < 0) {
|
hostname = InetAddress.getLocalHost().getCanonicalHostName();
|
||||||
return url;
|
} catch (UnknownHostException ignore) {}
|
||||||
}
|
principal = new X500Principal("CN=" + hostname + ", OU=unknown, O=unknown, C=unknown");
|
||||||
|
|
||||||
return url.substring(0, index);
|
// setup the PoolingHttpClientConnectionManager
|
||||||
}
|
|
||||||
|
|
||||||
private Config getConfig(final String url, final ProcessContext context) {
|
|
||||||
final String baseUrl = getBaseUrl(url);
|
|
||||||
Config config = configMap.get(baseUrl);
|
|
||||||
if (config != null) {
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
final PoolingHttpClientConnectionManager conMan;
|
|
||||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||||
if (sslContextService == null) {
|
if (sslContextService == null) {
|
||||||
conMan = new PoolingHttpClientConnectionManager();
|
connManager = new PoolingHttpClientConnectionManager();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
final SSLContext sslContext;
|
final SSLContext sslContext;
|
||||||
try {
|
try {
|
||||||
|
@ -397,15 +400,110 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
.register("http", PlainConnectionSocketFactory.getSocketFactory())
|
.register("http", PlainConnectionSocketFactory.getSocketFactory())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
|
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
|
||||||
}
|
}
|
||||||
|
|
||||||
conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks());
|
// setup SocketConfig
|
||||||
conMan.setMaxTotal(context.getMaxConcurrentTasks());
|
SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
|
||||||
config = new Config(conMan);
|
socketConfigBuilder.setSoTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
final Config existingConfig = configMap.putIfAbsent(baseUrl, config);
|
SocketConfig socketConfig = socketConfigBuilder.build();
|
||||||
|
connManager.setDefaultSocketConfig(socketConfig);
|
||||||
|
|
||||||
return existingConfig == null ? config : existingConfig;
|
// the +1 here accommodates math error calculating excess connections in AbstractConnPool.getPoolEntryBlocking()
|
||||||
|
connManager.setDefaultMaxPerRoute(context.getMaxConcurrentTasks() + 1);
|
||||||
|
// max total connections will get set in onTrigger(), because a new route will require increasing this
|
||||||
|
connManager.setMaxTotal(1);
|
||||||
|
// enable inactivity check, to detect and close idle connections
|
||||||
|
connManager.setValidateAfterInactivity(30_000);
|
||||||
|
|
||||||
|
// setup the HttpClientBuilder
|
||||||
|
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
|
||||||
|
clientBuilder.setConnectionManager(connManager);
|
||||||
|
clientBuilder.setUserAgent(context.getProperty(USER_AGENT).getValue());
|
||||||
|
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
|
||||||
|
@Override
|
||||||
|
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
|
||||||
|
final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
|
||||||
|
final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
|
||||||
|
if (!conn.isOpen()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final SSLSession sslSession = conn.getSSLSession();
|
||||||
|
|
||||||
|
if (sslSession != null) {
|
||||||
|
final Certificate[] certChain = sslSession.getPeerCertificates();
|
||||||
|
if (certChain == null || certChain.length == 0) {
|
||||||
|
throw new SSLPeerUnverifiedException("No certificates found");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
|
||||||
|
httpContext.setAttribute(REMOTE_DN, cert.getSubjectDN().getName().trim());
|
||||||
|
} catch (CertificateException e) {
|
||||||
|
final String msg = "Could not extract subject DN from SSL session peer certificate";
|
||||||
|
getLogger().warn(msg);
|
||||||
|
throw new SSLPeerUnverifiedException(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpRequestRetryHandler retryHandler = (exception, attempt, httpContext) -> {
|
||||||
|
if (attempt > 3 || !isScheduled()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final HttpClientContext clientContext = HttpClientContext.adapt(httpContext);
|
||||||
|
// A heavily loaded remote listener can manifest as NoHttpResponseExceptions here.
|
||||||
|
// When this happens, take a 5 second snooze before retrying to give the remote a short break.
|
||||||
|
if (exception instanceof NoHttpResponseException) {
|
||||||
|
if (getLogger().isDebugEnabled()) {
|
||||||
|
getLogger().debug("Sleeping for 5 secs then retrying {} request for remote server {}",
|
||||||
|
new Object[]{clientContext.getRequest().getRequestLine().getMethod(), clientContext.getTargetHost()});
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// do not retry more serious exceptions
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
clientBuilder.setRetryHandler(retryHandler);
|
||||||
|
clientBuilder.disableContentCompression();
|
||||||
|
|
||||||
|
final String username = context.getProperty(USERNAME).getValue();
|
||||||
|
final String password = context.getProperty(PASSWORD).getValue();
|
||||||
|
// set the credentials if appropriate
|
||||||
|
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
|
if (username != null) {
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the proxy if specified
|
||||||
|
HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
|
||||||
|
|
||||||
|
// complete the HTTPClient build
|
||||||
|
client = clientBuilder.build();
|
||||||
|
|
||||||
|
// setup RequestConfig
|
||||||
|
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
|
||||||
|
requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfigBuilder.setRedirectsEnabled(false);
|
||||||
|
requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfig = requestConfigBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBaseUrl(final String url) {
|
||||||
|
final int index = url.indexOf("/", 9);
|
||||||
|
if (index < 0) {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
return url.substring(0, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SSLContext createSSLContext(final SSLContextService service)
|
private SSLContext createSSLContext(final SSLContextService service)
|
||||||
|
@ -427,9 +525,14 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
keystore.load(in, service.getKeyStorePassword().toCharArray());
|
keystore.load(in, service.getKeyStorePassword().toCharArray());
|
||||||
}
|
}
|
||||||
builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
|
builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
|
||||||
|
final String alias = keystore.aliases().nextElement();
|
||||||
|
final Certificate cert = keystore.getCertificate(alias);
|
||||||
|
if (cert instanceof X509Certificate) {
|
||||||
|
principal = ((X509Certificate) cert).getSubjectDN();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
builder = builder.useProtocol(service.getSslAlgorithm());
|
builder = builder.setProtocol(service.getSslAlgorithm());
|
||||||
|
|
||||||
final SSLContext sslContext = builder.build();
|
final SSLContext sslContext = builder.build();
|
||||||
return sslContext;
|
return sslContext;
|
||||||
|
@ -459,14 +562,6 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
|
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
|
||||||
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
|
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
|
||||||
final String userAgent = context.getProperty(USER_AGENT).getValue();
|
|
||||||
|
|
||||||
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
|
|
||||||
requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
|
||||||
requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
|
||||||
requestConfigBuilder.setRedirectsEnabled(false);
|
|
||||||
requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
|
||||||
final RequestConfig requestConfig = requestConfigBuilder.build();
|
|
||||||
|
|
||||||
final StreamThrottler throttler = throttlerRef.get();
|
final StreamThrottler throttler = throttlerRef.get();
|
||||||
|
|
||||||
|
@ -474,75 +569,25 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
|
final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
|
||||||
|
|
||||||
DestinationAccepts destinationAccepts = null;
|
DestinationAccepts destinationAccepts = null;
|
||||||
CloseableHttpClient client = null;
|
|
||||||
final String transactionId = UUID.randomUUID().toString();
|
final String transactionId = UUID.randomUUID().toString();
|
||||||
|
final HttpClientContext httpClientContext = HttpClientContext.create();
|
||||||
final AtomicReference<String> dnHolder = new AtomicReference<>("none");
|
httpClientContext.setUserToken(principal);
|
||||||
|
|
||||||
final Config config = getConfig(url, context);
|
|
||||||
final HttpClientConnectionManager conMan = config.getConnectionManager();
|
|
||||||
|
|
||||||
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
|
|
||||||
clientBuilder.setConnectionManager(conMan);
|
|
||||||
clientBuilder.setUserAgent(userAgent);
|
|
||||||
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
|
|
||||||
@Override
|
|
||||||
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
|
|
||||||
final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
|
|
||||||
final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
|
|
||||||
if (!conn.isOpen()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final SSLSession sslSession = conn.getSSLSession();
|
|
||||||
|
|
||||||
if (sslSession != null) {
|
|
||||||
final Certificate[] certChain = sslSession.getPeerCertificates();
|
|
||||||
if (certChain == null || certChain.length == 0) {
|
|
||||||
throw new SSLPeerUnverifiedException("No certificates found");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
|
|
||||||
dnHolder.set(cert.getSubjectDN().getName().trim());
|
|
||||||
} catch (CertificateException e) {
|
|
||||||
final String msg = "Could not extract subject DN from SSL session peer certificate";
|
|
||||||
logger.warn(msg);
|
|
||||||
throw new SSLPeerUnverifiedException(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
clientBuilder.disableAutomaticRetries();
|
|
||||||
clientBuilder.disableContentCompression();
|
|
||||||
|
|
||||||
final String username = context.getProperty(USERNAME).getValue();
|
|
||||||
final String password = context.getProperty(PASSWORD).getValue();
|
|
||||||
// set the credentials if appropriate
|
|
||||||
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
||||||
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
||||||
if (username != null) {
|
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the proxy if specified
|
|
||||||
HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
|
|
||||||
|
|
||||||
client = clientBuilder.build();
|
|
||||||
|
|
||||||
// determine whether or not destination accepts flowfile/gzip
|
// determine whether or not destination accepts flowfile/gzip
|
||||||
destinationAccepts = config.getDestinationAccepts();
|
final String baseUrl = getBaseUrl(url);
|
||||||
|
destinationAccepts = destinationAcceptsMap.get(baseUrl);
|
||||||
if (destinationAccepts == null) {
|
if (destinationAccepts == null) {
|
||||||
try {
|
try {
|
||||||
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId);
|
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, url, transactionId, httpClientContext);
|
||||||
config.setDestinationAccepts(destinationAccepts);
|
if (null == destinationAcceptsMap.putIfAbsent(baseUrl, destinationAccepts)) {
|
||||||
|
// url indicates a new route, so increase the max allowed open connections
|
||||||
|
connManager.setMaxTotal(connManager.getMaxTotal() + connManager.getDefaultMaxPerRoute());
|
||||||
|
}
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
firstFlowFile = session.penalize(firstFlowFile);
|
firstFlowFile = session.penalize(firstFlowFile);
|
||||||
session.transfer(firstFlowFile, REL_FAILURE);
|
session.transfer(firstFlowFile, REL_FAILURE);
|
||||||
logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
|
logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
|
||||||
+ "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e});
|
+ "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e});
|
||||||
context.yield();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -583,23 +628,25 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final OutputStream out = wrappedOut) {
|
try (final OutputStream out = wrappedOut) {
|
||||||
|
final FlowFilePackager packager;
|
||||||
|
if (!sendAsFlowFile) {
|
||||||
|
packager = null;
|
||||||
|
} else if (accepts.isFlowFileV3Accepted()) {
|
||||||
|
packager = new FlowFilePackagerV3();
|
||||||
|
} else if (accepts.isFlowFileV2Accepted()) {
|
||||||
|
packager = new FlowFilePackagerV2();
|
||||||
|
} else if (accepts.isFlowFileV1Accepted()) {
|
||||||
|
packager = new FlowFilePackagerV1();
|
||||||
|
} else {
|
||||||
|
packager = null;
|
||||||
|
}
|
||||||
|
|
||||||
for (final FlowFile flowFile : toSend) {
|
for (final FlowFile flowFile : toSend) {
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
public void process(final InputStream rawIn) throws IOException {
|
||||||
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
||||||
|
|
||||||
FlowFilePackager packager = null;
|
|
||||||
if (!sendAsFlowFile) {
|
|
||||||
packager = null;
|
|
||||||
} else if (accepts.isFlowFileV3Accepted()) {
|
|
||||||
packager = new FlowFilePackagerV3();
|
|
||||||
} else if (accepts.isFlowFileV2Accepted()) {
|
|
||||||
packager = new FlowFilePackagerV2();
|
|
||||||
} else if (accepts.isFlowFileV1Accepted()) {
|
|
||||||
packager = new FlowFilePackagerV1();
|
|
||||||
}
|
|
||||||
|
|
||||||
// if none of the above conditions is met, we should never get here, because
|
// if none of the above conditions is met, we should never get here, because
|
||||||
// we will have already verified that at least 1 of the FlowFile packaging
|
// we will have already verified that at least 1 of the FlowFile packaging
|
||||||
// formats is acceptable if sending as FlowFile.
|
// formats is acceptable if sending as FlowFile.
|
||||||
|
@ -625,6 +672,15 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
out.flush();
|
out.flush();
|
||||||
|
} catch (ProcessException pe) {
|
||||||
|
// Pull out IOExceptions so that HTTPClient can properly do what it needs to do
|
||||||
|
Throwable t = pe.getCause();
|
||||||
|
if (t != null && t instanceof IOException) {
|
||||||
|
IOException ioe = new IOException(t.getMessage());
|
||||||
|
ioe.setStackTrace(t.getStackTrace());
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
throw pe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}) {
|
}) {
|
||||||
|
@ -639,6 +695,8 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles";
|
||||||
|
|
||||||
if (context.getProperty(CHUNKED_ENCODING).isSet()) {
|
if (context.getProperty(CHUNKED_ENCODING).isSet()) {
|
||||||
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
|
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
|
||||||
}
|
}
|
||||||
|
@ -654,11 +712,13 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
} else if (accepts.isFlowFileV1Accepted()) {
|
} else if (accepts.isFlowFileV1Accepted()) {
|
||||||
contentType = APPLICATION_FLOW_FILE_V1;
|
contentType = APPLICATION_FLOW_FILE_V1;
|
||||||
} else {
|
} else {
|
||||||
logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is "
|
logger.error("Cannot send {} to {} because the destination does not accept FlowFiles and this processor is "
|
||||||
+ "configured to deliver FlowFiles; rolling back session", new Object[]{url});
|
+ "configured to deliver FlowFiles; routing to failure",
|
||||||
session.rollback();
|
new Object[] {flowFileDescription, url});
|
||||||
context.yield();
|
for (FlowFile flowFile : toSend) {
|
||||||
IOUtils.closeQuietly(client);
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -692,24 +752,17 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do the actual POST
|
// Do the actual POST
|
||||||
final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles";
|
|
||||||
|
|
||||||
final String uploadDataRate;
|
final String uploadDataRate;
|
||||||
final long uploadMillis;
|
final long uploadMillis;
|
||||||
CloseableHttpResponse response = null;
|
CloseableHttpResponse response = null;
|
||||||
try {
|
try {
|
||||||
final StopWatch stopWatch = new StopWatch(true);
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
response = client.execute(post);
|
response = client.execute(post, httpClientContext);
|
||||||
|
|
||||||
// consume input stream entirely, ignoring its contents. If we
|
|
||||||
// don't do this, the Connection will not be returned to the pool
|
|
||||||
EntityUtils.consume(response.getEntity());
|
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
|
uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
|
||||||
uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException | ProcessException e) {
|
||||||
logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
|
logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
|
||||||
context.yield();
|
|
||||||
for (FlowFile flowFile : toSend) {
|
for (FlowFile flowFile : toSend) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
@ -718,9 +771,10 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
} finally {
|
} finally {
|
||||||
if (response != null) {
|
if (response != null) {
|
||||||
try {
|
try {
|
||||||
response.close();
|
// consume input stream entirely, ignoring its contents. If we
|
||||||
} catch (final IOException e) {
|
// don't do this, the Connection will not be returned to the pool
|
||||||
getLogger().warn("Failed to close HTTP Response due to {}", new Object[]{e});
|
EntityUtils.consume(response.getEntity());
|
||||||
|
} catch (final IOException ignore) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -744,10 +798,10 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (holdUri == null) {
|
if (holdUri == null) {
|
||||||
|
logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI",
|
||||||
|
new Object[]{flowFileDescription, url, responseCode, responseReason});
|
||||||
for (FlowFile flowFile : toSend) {
|
for (FlowFile flowFile : toSend) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI",
|
|
||||||
new Object[]{flowFile, url, responseCode, responseReason});
|
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -756,22 +810,20 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
if (holdUri == null) {
|
if (holdUri == null) {
|
||||||
if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) {
|
if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) {
|
||||||
|
logger.error("Failed to Post {} to {}: response code was {}:{}",
|
||||||
|
new Object[]{flowFileDescription, url, responseCode, responseReason});
|
||||||
for (FlowFile flowFile : toSend) {
|
for (FlowFile flowFile : toSend) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, "
|
|
||||||
+ "since the destination is temporarily unavailable",
|
|
||||||
new Object[]{flowFile, url, responseCode, responseReason});
|
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
context.yield();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (responseCode >= 300) {
|
if (responseCode >= 300) {
|
||||||
|
logger.error("Failed to Post {} to {}: response code was {}:{}",
|
||||||
|
new Object[]{flowFileDescription, url, responseCode, responseReason});
|
||||||
for (FlowFile flowFile : toSend) {
|
for (FlowFile flowFile : toSend) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
logger.error("Failed to Post {} to {}: response code was {}:{}",
|
|
||||||
new Object[]{flowFile, url, responseCode, responseReason});
|
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -781,7 +833,7 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
|
new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
|
||||||
|
|
||||||
for (final FlowFile flowFile : toSend) {
|
for (final FlowFile flowFile : toSend) {
|
||||||
session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + dnHolder.get(), uploadMillis, true);
|
session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -815,54 +867,58 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
final HttpDelete delete = new HttpDelete(fullHoldUri);
|
final HttpDelete delete = new HttpDelete(fullHoldUri);
|
||||||
delete.setHeader(TRANSACTION_ID_HEADER, transactionId);
|
delete.setHeader(TRANSACTION_ID_HEADER, transactionId);
|
||||||
|
delete.setConfig(requestConfig);
|
||||||
|
|
||||||
while (true) {
|
HttpResponse holdResponse = null;
|
||||||
try {
|
try {
|
||||||
final HttpResponse holdResponse = client.execute(delete);
|
holdResponse = client.execute(delete, httpClientContext);
|
||||||
EntityUtils.consume(holdResponse.getEntity());
|
final int holdStatusCode = holdResponse.getStatusLine().getStatusCode();
|
||||||
final int holdStatusCode = holdResponse.getStatusLine().getStatusCode();
|
final String holdReason = holdResponse.getStatusLine().getReasonPhrase();
|
||||||
final String holdReason = holdResponse.getStatusLine().getReasonPhrase();
|
if (holdStatusCode >= 300) {
|
||||||
if (holdStatusCode >= 300) {
|
logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure",
|
||||||
logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure",
|
new Object[]{flowFileDescription, holdStatusCode, holdReason});
|
||||||
new Object[]{flowFileDescription, holdStatusCode, holdReason});
|
|
||||||
|
|
||||||
for (FlowFile flowFile : toSend) {
|
|
||||||
flowFile = session.penalize(flowFile);
|
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate});
|
|
||||||
|
|
||||||
for (final FlowFile flowFile : toSend) {
|
|
||||||
session.getProvenanceReporter().send(flowFile, url);
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
} catch (final IOException e) {
|
|
||||||
logger.warn("Failed to delete Hold that destination placed on {} due to {}", new Object[]{flowFileDescription, e});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isScheduled()) {
|
|
||||||
context.yield();
|
|
||||||
logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription});
|
|
||||||
for (FlowFile flowFile : toSend) {
|
for (FlowFile flowFile : toSend) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info("Successfully Posted {} to {} in {} at a rate of {}",
|
||||||
|
new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
|
||||||
|
|
||||||
|
for (final FlowFile flowFile : toSend) {
|
||||||
|
session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
} catch (final IOException e) {
|
||||||
|
logger.warn("Failed to delete Hold that destination placed on {} due to {}; routing to failure", new Object[]{flowFileDescription, e});
|
||||||
|
for (FlowFile flowFile : toSend) {
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (null != holdResponse) {
|
||||||
|
try {
|
||||||
|
// consume input stream entirely, ignoring its contents. If we
|
||||||
|
// don't do this, the Connection will not be returned to the pool
|
||||||
|
EntityUtils.consume(holdResponse.getEntity());
|
||||||
|
} catch (IOException ignore) {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final HttpClient client, final String uri,
|
private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final String uri, final String transactionId, final HttpContext httpContext) throws IOException {
|
||||||
final ComponentLog logger, final String transactionId) throws IOException {
|
|
||||||
final HttpHead head = new HttpHead(uri);
|
final HttpHead head = new HttpHead(uri);
|
||||||
|
head.setConfig(requestConfig);
|
||||||
if (sendAsFlowFile) {
|
if (sendAsFlowFile) {
|
||||||
head.addHeader(TRANSACTION_ID_HEADER, transactionId);
|
head.addHeader(TRANSACTION_ID_HEADER, transactionId);
|
||||||
}
|
}
|
||||||
final HttpResponse response = client.execute(head);
|
|
||||||
|
final HttpResponse response = client.execute(head, httpContext);
|
||||||
|
|
||||||
// we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile
|
// we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile
|
||||||
// otherwise, we do not bother to make any determinations concerning this compatibility
|
// otherwise, we do not bother to make any determinations concerning this compatibility
|
||||||
|
@ -901,12 +957,14 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (acceptsFlowFileV3) {
|
if (getLogger().isDebugEnabled()) {
|
||||||
logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
|
if (acceptsFlowFileV3) {
|
||||||
} else if (acceptsFlowFileV2) {
|
getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
|
||||||
logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
|
} else if (acceptsFlowFileV2) {
|
||||||
} else if (acceptsFlowFileV1) {
|
getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
|
||||||
logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
|
} else if (acceptsFlowFileV1) {
|
||||||
|
getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -921,15 +979,17 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (acceptsGzip) {
|
if (getLogger().isDebugEnabled()) {
|
||||||
logger.debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported");
|
if (acceptsGzip) {
|
||||||
} else {
|
getLogger().debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported");
|
||||||
logger.debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression");
|
} else {
|
||||||
|
getLogger().debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion);
|
return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of "
|
getLogger().warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of "
|
||||||
+ statusCode + ": " + response.getStatusLine().getReasonPhrase());
|
+ statusCode + ": " + response.getStatusLine().getReasonPhrase());
|
||||||
return new DestinationAccepts(false, false, false, false, null);
|
return new DestinationAccepts(false, false, false, false, null);
|
||||||
}
|
}
|
||||||
|
@ -971,26 +1031,4 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
return protocolVersion;
|
return protocolVersion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Config {
|
|
||||||
|
|
||||||
private volatile DestinationAccepts destinationAccepts;
|
|
||||||
private final HttpClientConnectionManager conMan;
|
|
||||||
|
|
||||||
public Config(final HttpClientConnectionManager conMan) {
|
|
||||||
this.conMan = conMan;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DestinationAccepts getDestinationAccepts() {
|
|
||||||
return this.destinationAccepts;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDestinationAccepts(final DestinationAccepts destinationAccepts) {
|
|
||||||
this.destinationAccepts = destinationAccepts;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HttpClientConnectionManager getConnectionManager() {
|
|
||||||
return conMan;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue