mirror of https://github.com/apache/nifi.git
NIFI-4699 Use a filter in PostHTTP to pull flowfiles from queue whose URL is the same
This closes #2412.
This commit is contained in:
parent
16e56ccfca
commit
a2f2ddd6b8
|
@ -66,6 +66,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.processor.FlowFileFilter;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
@ -76,8 +77,6 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.security.util.CertificateUtils;
|
import org.apache.nifi.security.util.CertificateUtils;
|
||||||
import org.apache.nifi.security.util.KeyStoreUtils;
|
import org.apache.nifi.security.util.KeyStoreUtils;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
|
||||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
|
||||||
import org.apache.nifi.stream.io.GZIPOutputStream;
|
import org.apache.nifi.stream.io.GZIPOutputStream;
|
||||||
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
|
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
|
||||||
import org.apache.nifi.stream.io.StreamThrottler;
|
import org.apache.nifi.stream.io.StreamThrottler;
|
||||||
|
@ -95,6 +94,8 @@ import javax.net.ssl.SSLPeerUnverifiedException;
|
||||||
import javax.net.ssl.SSLSession;
|
import javax.net.ssl.SSLSession;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -121,6 +122,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -449,6 +451,26 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
FlowFile firstFlowFile = session.get();
|
||||||
|
if (firstFlowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ComponentLog logger = getLogger();
|
||||||
|
final String url = context.getProperty(URL).evaluateAttributeExpressions(firstFlowFile).getValue();
|
||||||
|
try {
|
||||||
|
new java.net.URL(url);
|
||||||
|
} catch (final MalformedURLException e) {
|
||||||
|
logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure",
|
||||||
|
new Object[]{firstFlowFile, url});
|
||||||
|
firstFlowFile = session.penalize(firstFlowFile);
|
||||||
|
session.transfer(firstFlowFile, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<FlowFile> toSend = new ArrayList<>();
|
||||||
|
toSend.add(firstFlowFile);
|
||||||
|
|
||||||
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
|
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
|
||||||
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
|
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
|
||||||
final String userAgent = context.getProperty(USER_AGENT).getValue();
|
final String userAgent = context.getProperty(USER_AGENT).getValue();
|
||||||
|
@ -461,141 +483,115 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
final RequestConfig requestConfig = requestConfigBuilder.build();
|
final RequestConfig requestConfig = requestConfigBuilder.build();
|
||||||
|
|
||||||
final StreamThrottler throttler = throttlerRef.get();
|
final StreamThrottler throttler = throttlerRef.get();
|
||||||
final ComponentLog logger = getLogger();
|
|
||||||
|
|
||||||
final Double maxBatchBytes = context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B);
|
final Double maxBatchBytes = context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B);
|
||||||
String lastUrl = null;
|
final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
|
||||||
long bytesToSend = 0L;
|
|
||||||
|
|
||||||
final List<FlowFile> toSend = new ArrayList<>();
|
|
||||||
DestinationAccepts destinationAccepts = null;
|
DestinationAccepts destinationAccepts = null;
|
||||||
CloseableHttpClient client = null;
|
CloseableHttpClient client = null;
|
||||||
final String transactionId = UUID.randomUUID().toString();
|
final String transactionId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
final AtomicReference<String> dnHolder = new AtomicReference<>("none");
|
final AtomicReference<String> dnHolder = new AtomicReference<>("none");
|
||||||
while (true) {
|
|
||||||
FlowFile flowFile = session.get();
|
|
||||||
if (flowFile == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String url = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue();
|
final Config config = getConfig(url, context);
|
||||||
try {
|
final HttpClientConnectionManager conMan = config.getConnectionManager();
|
||||||
new java.net.URL(url);
|
|
||||||
} catch (final MalformedURLException e) {
|
|
||||||
logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure",
|
|
||||||
new Object[]{flowFile, url});
|
|
||||||
flowFile = session.penalize(flowFile);
|
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this FlowFile doesn't have the same url, throw it back on the queue and stop grabbing FlowFiles
|
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
|
||||||
if (lastUrl != null && !lastUrl.equals(url)) {
|
clientBuilder.setConnectionManager(conMan);
|
||||||
session.transfer(flowFile);
|
clientBuilder.setUserAgent(userAgent);
|
||||||
break;
|
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
|
||||||
}
|
@Override
|
||||||
|
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
|
||||||
lastUrl = url;
|
final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
|
||||||
toSend.add(flowFile);
|
final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
|
||||||
|
if (!conn.isOpen()) {
|
||||||
if (client == null || destinationAccepts == null) {
|
return;
|
||||||
final Config config = getConfig(url, context);
|
|
||||||
final HttpClientConnectionManager conMan = config.getConnectionManager();
|
|
||||||
|
|
||||||
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
|
|
||||||
clientBuilder.setConnectionManager(conMan);
|
|
||||||
clientBuilder.setUserAgent(userAgent);
|
|
||||||
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
|
|
||||||
@Override
|
|
||||||
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
|
|
||||||
final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
|
|
||||||
final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
|
|
||||||
if (!conn.isOpen()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final SSLSession sslSession = conn.getSSLSession();
|
|
||||||
|
|
||||||
if (sslSession != null) {
|
|
||||||
final Certificate[] certChain = sslSession.getPeerCertificates();
|
|
||||||
if (certChain == null || certChain.length == 0) {
|
|
||||||
throw new SSLPeerUnverifiedException("No certificates found");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
|
|
||||||
dnHolder.set(cert.getSubjectDN().getName().trim());
|
|
||||||
} catch (CertificateException e) {
|
|
||||||
final String msg = "Could not extract subject DN from SSL session peer certificate";
|
|
||||||
logger.warn(msg);
|
|
||||||
throw new SSLPeerUnverifiedException(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
clientBuilder.disableAutomaticRetries();
|
|
||||||
clientBuilder.disableContentCompression();
|
|
||||||
|
|
||||||
final String username = context.getProperty(USERNAME).getValue();
|
|
||||||
final String password = context.getProperty(PASSWORD).getValue();
|
|
||||||
// set the credentials if appropriate
|
|
||||||
if (username != null) {
|
|
||||||
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
||||||
if (password == null) {
|
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
|
|
||||||
} else {
|
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
|
||||||
}
|
|
||||||
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the proxy if specified
|
final SSLSession sslSession = conn.getSSLSession();
|
||||||
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
|
|
||||||
final String host = context.getProperty(PROXY_HOST).getValue();
|
|
||||||
final int port = context.getProperty(PROXY_PORT).asInteger();
|
|
||||||
clientBuilder.setProxy(new HttpHost(host, port));
|
|
||||||
}
|
|
||||||
|
|
||||||
client = clientBuilder.build();
|
if (sslSession != null) {
|
||||||
|
final Certificate[] certChain = sslSession.getPeerCertificates();
|
||||||
|
if (certChain == null || certChain.length == 0) {
|
||||||
|
throw new SSLPeerUnverifiedException("No certificates found");
|
||||||
|
}
|
||||||
|
|
||||||
// determine whether or not destination accepts flowfile/gzip
|
|
||||||
destinationAccepts = config.getDestinationAccepts();
|
|
||||||
if (destinationAccepts == null) {
|
|
||||||
try {
|
try {
|
||||||
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId);
|
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
|
||||||
config.setDestinationAccepts(destinationAccepts);
|
dnHolder.set(cert.getSubjectDN().getName().trim());
|
||||||
} catch (final IOException e) {
|
} catch (CertificateException e) {
|
||||||
flowFile = session.penalize(flowFile);
|
final String msg = "Could not extract subject DN from SSL session peer certificate";
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
logger.warn(msg);
|
||||||
logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
|
throw new SSLPeerUnverifiedException(msg);
|
||||||
+ "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, flowFile, e});
|
|
||||||
context.yield();
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
bytesToSend += flowFile.getSize();
|
clientBuilder.disableAutomaticRetries();
|
||||||
if (bytesToSend > maxBatchBytes.longValue()) {
|
clientBuilder.disableContentCompression();
|
||||||
break;
|
|
||||||
|
final String username = context.getProperty(USERNAME).getValue();
|
||||||
|
final String password = context.getProperty(PASSWORD).getValue();
|
||||||
|
// set the credentials if appropriate
|
||||||
|
if (username != null) {
|
||||||
|
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
if (password == null) {
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
|
||||||
|
} else {
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||||
}
|
}
|
||||||
|
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
|
}
|
||||||
|
|
||||||
// if we are not sending as flowfile, or if the destination doesn't accept V3 or V2 (streaming) format,
|
// Set the proxy if specified
|
||||||
// then only use a single FlowFile
|
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
|
||||||
if (!sendAsFlowFile || !destinationAccepts.isFlowFileV3Accepted() && !destinationAccepts.isFlowFileV2Accepted()) {
|
final String host = context.getProperty(PROXY_HOST).getValue();
|
||||||
break;
|
final int port = context.getProperty(PROXY_PORT).asInteger();
|
||||||
|
clientBuilder.setProxy(new HttpHost(host, port));
|
||||||
|
}
|
||||||
|
|
||||||
|
client = clientBuilder.build();
|
||||||
|
|
||||||
|
// determine whether or not destination accepts flowfile/gzip
|
||||||
|
destinationAccepts = config.getDestinationAccepts();
|
||||||
|
if (destinationAccepts == null) {
|
||||||
|
try {
|
||||||
|
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId);
|
||||||
|
config.setDestinationAccepts(destinationAccepts);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
firstFlowFile = session.penalize(firstFlowFile);
|
||||||
|
session.transfer(firstFlowFile, REL_FAILURE);
|
||||||
|
logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
|
||||||
|
+ "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e});
|
||||||
|
context.yield();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toSend.isEmpty()) {
|
// if we are sending as flowfile and the destination accepts V3 or V2 (streaming) format,
|
||||||
return;
|
// then we can get more flowfiles from the session up to MAX_BATCH_SIZE for the same URL
|
||||||
|
if (sendAsFlowFile && (destinationAccepts.isFlowFileV3Accepted() || destinationAccepts.isFlowFileV2Accepted())) {
|
||||||
|
toSend.addAll(session.get(new FlowFileFilter() {
|
||||||
|
@Override
|
||||||
|
public FlowFileFilterResult filter(FlowFile flowFile) {
|
||||||
|
// if over MAX_BATCH_SIZE, then stop adding files
|
||||||
|
if (bytesToSend.get() + flowFile.getSize() > maxBatchBytes) {
|
||||||
|
return FlowFileFilterResult.REJECT_AND_TERMINATE;
|
||||||
|
}
|
||||||
|
// check URL to see if this flowfile can be included in the batch
|
||||||
|
final String urlToCheck = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
if (url.equals(urlToCheck)) {
|
||||||
|
bytesToSend.addAndGet(flowFile.getSize());
|
||||||
|
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
|
||||||
|
} else {
|
||||||
|
return FlowFileFilterResult.REJECT_AND_CONTINUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
final String url = lastUrl;
|
|
||||||
final HttpPost post = new HttpPost(url);
|
final HttpPost post = new HttpPost(url);
|
||||||
final List<FlowFile> flowFileList = toSend;
|
|
||||||
final DestinationAccepts accepts = destinationAccepts;
|
final DestinationAccepts accepts = destinationAccepts;
|
||||||
final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() == null;
|
final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() == null;
|
||||||
|
|
||||||
|
@ -609,7 +605,7 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final OutputStream out = wrappedOut) {
|
try (final OutputStream out = wrappedOut) {
|
||||||
for (final FlowFile flowFile : flowFileList) {
|
for (final FlowFile flowFile : toSend) {
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
public void process(final InputStream rawIn) throws IOException {
|
||||||
|
@ -693,10 +689,10 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
|
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
|
||||||
if (attributeHeaderRegex != null && !sendAsFlowFile && flowFileList.size() == 1) {
|
if (attributeHeaderRegex != null && !sendAsFlowFile && toSend.size() == 1) {
|
||||||
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
|
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
|
||||||
|
|
||||||
final Map<String, String> attributes = flowFileList.get(0).getAttributes();
|
final Map<String, String> attributes = toSend.get(0).getAttributes();
|
||||||
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
|
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
|
||||||
final String key = entry.getKey();
|
final String key = entry.getKey();
|
||||||
if (pattern.matcher(key).matches()) {
|
if (pattern.matcher(key).matches()) {
|
||||||
|
@ -731,7 +727,7 @@ public class PostHTTP extends AbstractProcessor {
|
||||||
// don't do this, the Connection will not be returned to the pool
|
// don't do this, the Connection will not be returned to the pool
|
||||||
EntityUtils.consume(response.getEntity());
|
EntityUtils.consume(response.getEntity());
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
uploadDataRate = stopWatch.calculateDataRate(bytesToSend);
|
uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
|
||||||
uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
|
logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
|
||||||
|
|
|
@ -17,18 +17,24 @@
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.ssl.StandardSSLContextService;
|
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||||
import org.apache.nifi.util.FlowFileUnpackagerV3;
|
import org.apache.nifi.util.FlowFileUnpackagerV3;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.eclipse.jetty.servlet.ServletHandler;
|
import org.eclipse.jetty.servlet.ServletHandler;
|
||||||
|
@ -441,4 +447,116 @@ public class TestPostHTTP {
|
||||||
Assert.assertTrue(runner.getProcessContext().getProperty(PostHTTP.USER_AGENT).getValue().startsWith("Apache-HttpClient"));
|
Assert.assertTrue(runner.getProcessContext().getProperty(PostHTTP.USER_AGENT).getValue().startsWith("Apache-HttpClient"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBatchWithMultipleUrls() throws Exception {
|
||||||
|
CaptureServlet servletA, servletB;
|
||||||
|
TestServer serverA, serverB;
|
||||||
|
|
||||||
|
{ // setup test servers
|
||||||
|
setup(null);
|
||||||
|
servletA = servlet;
|
||||||
|
serverA = server;
|
||||||
|
|
||||||
|
// set up second web service
|
||||||
|
ServletHandler handler = new ServletHandler();
|
||||||
|
handler.addServletWithMapping(CaptureServlet.class, "/*");
|
||||||
|
|
||||||
|
// create the second service
|
||||||
|
serverB = new TestServer(null);
|
||||||
|
serverB.addHandler(handler);
|
||||||
|
serverB.startServer();
|
||||||
|
|
||||||
|
servletB = (CaptureServlet) handler.getServlets()[0].getServlet();
|
||||||
|
}
|
||||||
|
|
||||||
|
runner.setProperty(PostHTTP.URL, "${url}"); // use EL for the URL
|
||||||
|
runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true");
|
||||||
|
runner.setProperty(PostHTTP.MAX_BATCH_SIZE, "10 b");
|
||||||
|
|
||||||
|
Set<String> expectedContentA = new HashSet<>();
|
||||||
|
Set<String> expectedContentB = new HashSet<>();
|
||||||
|
|
||||||
|
Set<String> actualContentA = new HashSet<>();
|
||||||
|
Set<String> actualContentB = new HashSet<>();
|
||||||
|
|
||||||
|
// enqueue 9 FlowFiles
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
enqueueWithURL("a" + i, serverA.getUrl());
|
||||||
|
enqueueWithURL("b" + i, serverB.getUrl());
|
||||||
|
|
||||||
|
expectedContentA.add("a" + i);
|
||||||
|
expectedContentB.add("b" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
// MAX_BATCH_SIZE is 10 bytes, each file is 2 bytes, so 18 files should produce 4 batches
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
runner.run(1);
|
||||||
|
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS);
|
||||||
|
assertFalse(successFiles.isEmpty());
|
||||||
|
|
||||||
|
MockFlowFile mff = successFiles.get(0);
|
||||||
|
final String urlAttr = mff.getAttribute("url");
|
||||||
|
|
||||||
|
if (serverA.getUrl().equals(urlAttr)) {
|
||||||
|
checkBatch(serverA, servletA, actualContentA, (actualContentA.isEmpty() ? 5 : 4));
|
||||||
|
} else if (serverB.getUrl().equals(urlAttr)) {
|
||||||
|
checkBatch(serverB, servletB, actualContentB, (actualContentB.isEmpty() ? 5 : 4));
|
||||||
|
} else {
|
||||||
|
fail("unexpected url attribute");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(expectedContentA, actualContentA);
|
||||||
|
assertEquals(expectedContentB, actualContentB);
|
||||||
|
|
||||||
|
// make sure everything transferred, nothing more to do
|
||||||
|
runner.run(1);
|
||||||
|
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enqueueWithURL(String data, String url) {
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("url", url);
|
||||||
|
runner.enqueue(data.getBytes(), attrs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkBatch(TestServer server, CaptureServlet servlet, Set<String> actualContent, int expectedCount) throws Exception {
|
||||||
|
FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3();
|
||||||
|
Set<String> actualFFContent = new HashSet<>();
|
||||||
|
Set<String> actualPostContent = new HashSet<>();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, expectedCount);
|
||||||
|
|
||||||
|
// confirm that all FlowFiles transferred to 'success' have the same URL
|
||||||
|
// also accumulate content to verify later
|
||||||
|
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS);
|
||||||
|
for (int i = 0; i < expectedCount; i++) {
|
||||||
|
MockFlowFile mff = successFlowFiles.get(i);
|
||||||
|
mff.assertAttributeEquals("url", server.getUrl());
|
||||||
|
String content = new String(mff.toByteArray());
|
||||||
|
actualFFContent.add(content);
|
||||||
|
}
|
||||||
|
|
||||||
|
// confirm that all FlowFiles POSTed to server have the same URL
|
||||||
|
// also accumulate content to verify later
|
||||||
|
try (ByteArrayInputStream bais = new ByteArrayInputStream(servlet.getLastPost());
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||||
|
for (int i = 0; i < expectedCount; i++) {
|
||||||
|
Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
|
||||||
|
String receivedContent = new String(baos.toByteArray());
|
||||||
|
actualPostContent.add(receivedContent);
|
||||||
|
assertEquals(server.getUrl(), receivedAttrs.get("url"));
|
||||||
|
assertTrue(unpacker.hasMoreData() || i == (expectedCount - 1));
|
||||||
|
baos.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// confirm that the transferred and POSTed content match
|
||||||
|
assertEquals(actualFFContent, actualPostContent);
|
||||||
|
|
||||||
|
// accumulate actial content
|
||||||
|
actualContent.addAll(actualPostContent);
|
||||||
|
runner.clearTransferState();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue