diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index d5344fa856..00e51d2c02 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -66,6 +66,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -76,8 +77,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.GZIPOutputStream; import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; import org.apache.nifi.stream.io.StreamThrottler; @@ -95,6 +94,8 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response.Status; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -121,6 +122,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -449,6 +451,26 @@ public class PostHTTP extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile firstFlowFile = session.get(); + if (firstFlowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + final String url = context.getProperty(URL).evaluateAttributeExpressions(firstFlowFile).getValue(); + try { + new java.net.URL(url); + } catch (final MalformedURLException e) { + logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure", + new Object[]{firstFlowFile, url}); + firstFlowFile = session.penalize(firstFlowFile); + session.transfer(firstFlowFile, REL_FAILURE); + return; + } + + final List toSend = new ArrayList<>(); + toSend.add(firstFlowFile); + final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); final String userAgent = context.getProperty(USER_AGENT).getValue(); @@ -461,141 +483,115 @@ public class PostHTTP extends AbstractProcessor { final RequestConfig requestConfig = requestConfigBuilder.build(); final StreamThrottler throttler = throttlerRef.get(); - final ComponentLog logger = getLogger(); final Double maxBatchBytes = context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B); - String lastUrl = null; - long bytesToSend = 0L; + final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize()); - final List toSend = new ArrayList<>(); DestinationAccepts destinationAccepts = null; CloseableHttpClient client = null; final String transactionId = UUID.randomUUID().toString(); final AtomicReference dnHolder = new AtomicReference<>("none"); - while (true) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - break; - } - final String url = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue(); - try { - new java.net.URL(url); - } catch (final MalformedURLException e) { - logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure", - new Object[]{flowFile, url}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - continue; - } + final Config config = getConfig(url, context); + final HttpClientConnectionManager conMan = config.getConnectionManager(); - // If this FlowFile doesn't have the same url, throw it back on the queue and stop grabbing FlowFiles - if (lastUrl != null && !lastUrl.equals(url)) { - session.transfer(flowFile); - break; - } - - lastUrl = url; - toSend.add(flowFile); - - if (client == null || destinationAccepts == null) { - final Config config = getConfig(url, context); - final HttpClientConnectionManager conMan = config.getConnectionManager(); - - final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); - clientBuilder.setConnectionManager(conMan); - clientBuilder.setUserAgent(userAgent); - clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { - @Override - public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); - final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); - if (!conn.isOpen()) { - return; - } - - final SSLSession sslSession = conn.getSSLSession(); - - if (sslSession != null) { - final Certificate[] certChain = sslSession.getPeerCertificates(); - if (certChain == null || certChain.length == 0) { - throw new SSLPeerUnverifiedException("No certificates found"); - } - - try { - final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); - dnHolder.set(cert.getSubjectDN().getName().trim()); - } catch (CertificateException e) { - final String msg = "Could not extract subject DN from SSL session peer certificate"; - logger.warn(msg); - throw new SSLPeerUnverifiedException(msg); - } - } - } - }); - - clientBuilder.disableAutomaticRetries(); - clientBuilder.disableContentCompression(); - - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); - // set the credentials if appropriate - if (username != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - if (password == null) { - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); - } else { - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - } - clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + clientBuilder.setConnectionManager(conMan); + clientBuilder.setUserAgent(userAgent); + clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { + @Override + public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); + final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); + if (!conn.isOpen()) { + return; } - // Set the proxy if specified - if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) { - final String host = context.getProperty(PROXY_HOST).getValue(); - final int port = context.getProperty(PROXY_PORT).asInteger(); - clientBuilder.setProxy(new HttpHost(host, port)); - } + final SSLSession sslSession = conn.getSSLSession(); - client = clientBuilder.build(); + if (sslSession != null) { + final Certificate[] certChain = sslSession.getPeerCertificates(); + if (certChain == null || certChain.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } - // determine whether or not destination accepts flowfile/gzip - destinationAccepts = config.getDestinationAccepts(); - if (destinationAccepts == null) { try { - destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId); - config.setDestinationAccepts(destinationAccepts); - } catch (final IOException e) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - logger.error("Unable to communicate with destination {} to determine whether or not it can accept " - + "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, flowFile, e}); - context.yield(); - return; + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); + dnHolder.set(cert.getSubjectDN().getName().trim()); + } catch (CertificateException e) { + final String msg = "Could not extract subject DN from SSL session peer certificate"; + logger.warn(msg); + throw new SSLPeerUnverifiedException(msg); } } } + }); - bytesToSend += flowFile.getSize(); - if (bytesToSend > maxBatchBytes.longValue()) { - break; + clientBuilder.disableAutomaticRetries(); + clientBuilder.disableContentCompression(); + + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + // set the credentials if appropriate + if (username != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (password == null) { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); + } else { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); } + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } - // if we are not sending as flowfile, or if the destination doesn't accept V3 or V2 (streaming) format, - // then only use a single FlowFile - if (!sendAsFlowFile || !destinationAccepts.isFlowFileV3Accepted() && !destinationAccepts.isFlowFileV2Accepted()) { - break; + // Set the proxy if specified + if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) { + final String host = context.getProperty(PROXY_HOST).getValue(); + final int port = context.getProperty(PROXY_PORT).asInteger(); + clientBuilder.setProxy(new HttpHost(host, port)); + } + + client = clientBuilder.build(); + + // determine whether or not destination accepts flowfile/gzip + destinationAccepts = config.getDestinationAccepts(); + if (destinationAccepts == null) { + try { + destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId); + config.setDestinationAccepts(destinationAccepts); + } catch (final IOException e) { + firstFlowFile = session.penalize(firstFlowFile); + session.transfer(firstFlowFile, REL_FAILURE); + logger.error("Unable to communicate with destination {} to determine whether or not it can accept " + + "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e}); + context.yield(); + return; } } - if (toSend.isEmpty()) { - return; + // if we are sending as flowfile and the destination accepts V3 or V2 (streaming) format, + // then we can get more flowfiles from the session up to MAX_BATCH_SIZE for the same URL + if (sendAsFlowFile && (destinationAccepts.isFlowFileV3Accepted() || destinationAccepts.isFlowFileV2Accepted())) { + toSend.addAll(session.get(new FlowFileFilter() { + @Override + public FlowFileFilterResult filter(FlowFile flowFile) { + // if over MAX_BATCH_SIZE, then stop adding files + if (bytesToSend.get() + flowFile.getSize() > maxBatchBytes) { + return FlowFileFilterResult.REJECT_AND_TERMINATE; + } + // check URL to see if this flowfile can be included in the batch + final String urlToCheck = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue(); + if (url.equals(urlToCheck)) { + bytesToSend.addAndGet(flowFile.getSize()); + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + })); } - final String url = lastUrl; final HttpPost post = new HttpPost(url); - final List flowFileList = toSend; final DestinationAccepts accepts = destinationAccepts; final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() == null; @@ -609,7 +605,7 @@ public class PostHTTP extends AbstractProcessor { } try (final OutputStream out = wrappedOut) { - for (final FlowFile flowFile : flowFileList) { + for (final FlowFile flowFile : toSend) { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { @@ -693,10 +689,10 @@ public class PostHTTP extends AbstractProcessor { } final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); - if (attributeHeaderRegex != null && !sendAsFlowFile && flowFileList.size() == 1) { + if (attributeHeaderRegex != null && !sendAsFlowFile && toSend.size() == 1) { final Pattern pattern = Pattern.compile(attributeHeaderRegex); - final Map attributes = flowFileList.get(0).getAttributes(); + final Map attributes = toSend.get(0).getAttributes(); for (final Map.Entry entry : attributes.entrySet()) { final String key = entry.getKey(); if (pattern.matcher(key).matches()) { @@ -731,7 +727,7 @@ public class PostHTTP extends AbstractProcessor { // don't do this, the Connection will not be returned to the pool EntityUtils.consume(response.getEntity()); stopWatch.stop(); - uploadDataRate = stopWatch.calculateDataRate(bytesToSend); + uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get()); uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); } catch (final IOException e) { logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index 9a46741c0d..abb19518b7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -17,18 +17,24 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.eclipse.jetty.servlet.ServletHandler; @@ -441,4 +447,116 @@ public class TestPostHTTP { Assert.assertTrue(runner.getProcessContext().getProperty(PostHTTP.USER_AGENT).getValue().startsWith("Apache-HttpClient")); } + @Test + public void testBatchWithMultipleUrls() throws Exception { + CaptureServlet servletA, servletB; + TestServer serverA, serverB; + + { // setup test servers + setup(null); + servletA = servlet; + serverA = server; + + // set up second web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(CaptureServlet.class, "/*"); + + // create the second service + serverB = new TestServer(null); + serverB.addHandler(handler); + serverB.startServer(); + + servletB = (CaptureServlet) handler.getServlets()[0].getServlet(); + } + + runner.setProperty(PostHTTP.URL, "${url}"); // use EL for the URL + runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); + runner.setProperty(PostHTTP.MAX_BATCH_SIZE, "10 b"); + + Set expectedContentA = new HashSet<>(); + Set expectedContentB = new HashSet<>(); + + Set actualContentA = new HashSet<>(); + Set actualContentB = new HashSet<>(); + + // enqueue 9 FlowFiles + for (int i = 0; i < 9; i++) { + enqueueWithURL("a" + i, serverA.getUrl()); + enqueueWithURL("b" + i, serverB.getUrl()); + + expectedContentA.add("a" + i); + expectedContentB.add("b" + i); + } + + // MAX_BATCH_SIZE is 10 bytes, each file is 2 bytes, so 18 files should produce 4 batches + for (int i = 0; i < 4; i++) { + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS); + assertFalse(successFiles.isEmpty()); + + MockFlowFile mff = successFiles.get(0); + final String urlAttr = mff.getAttribute("url"); + + if (serverA.getUrl().equals(urlAttr)) { + checkBatch(serverA, servletA, actualContentA, (actualContentA.isEmpty() ? 5 : 4)); + } else if (serverB.getUrl().equals(urlAttr)) { + checkBatch(serverB, servletB, actualContentB, (actualContentB.isEmpty() ? 5 : 4)); + } else { + fail("unexpected url attribute"); + } + } + + assertEquals(expectedContentA, actualContentA); + assertEquals(expectedContentB, actualContentB); + + // make sure everything transferred, nothing more to do + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 0); + } + + private void enqueueWithURL(String data, String url) { + final Map attrs = new HashMap<>(); + attrs.put("url", url); + runner.enqueue(data.getBytes(), attrs); + } + + private void checkBatch(TestServer server, CaptureServlet servlet, Set actualContent, int expectedCount) throws Exception { + FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); + Set actualFFContent = new HashSet<>(); + Set actualPostContent = new HashSet<>(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, expectedCount); + + // confirm that all FlowFiles transferred to 'success' have the same URL + // also accumulate content to verify later + final List successFlowFiles = runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS); + for (int i = 0; i < expectedCount; i++) { + MockFlowFile mff = successFlowFiles.get(i); + mff.assertAttributeEquals("url", server.getUrl()); + String content = new String(mff.toByteArray()); + actualFFContent.add(content); + } + + // confirm that all FlowFiles POSTed to server have the same URL + // also accumulate content to verify later + try (ByteArrayInputStream bais = new ByteArrayInputStream(servlet.getLastPost()); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + for (int i = 0; i < expectedCount; i++) { + Map receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + String receivedContent = new String(baos.toByteArray()); + actualPostContent.add(receivedContent); + assertEquals(server.getUrl(), receivedAttrs.get("url")); + assertTrue(unpacker.hasMoreData() || i == (expectedCount - 1)); + baos.reset(); + } + } + + // confirm that the transferred and POSTed content match + assertEquals(actualFFContent, actualPostContent); + + // accumulate actial content + actualContent.addAll(actualPostContent); + runner.clearTransferState(); + } }