From 7bd2c64adb6a6abb012861d40d84b30841043b23 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 11 May 2016 08:08:58 +0900 Subject: [PATCH] NIFI-1865 Close StreamThrottler when processor stops. - Also, replaced copyrighted sample texts for tests Signed-off-by: Matt Burgess --- .../nifi/processors/standard/ListenHTTP.java | 12 +++++ .../nifi/processors/standard/PostHTTP.java | 9 ++++ .../processors/standard/TestPostHTTP.java | 47 +++++++++++++++---- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 88b66667c8..5ea116e706 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -136,6 +137,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { private volatile Server server = null; private final ConcurrentMap flowFileMap = new ConcurrentHashMap<>(); private final AtomicReference sessionFactoryReference = new AtomicReference<>(); + private final AtomicReference throttlerRef = new AtomicReference<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -166,6 +168,15 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { @OnStopped public void shutdownHttpServer() { + final StreamThrottler throttler = throttlerRef.getAndSet(null); + if(throttler != null) { + try { + throttler.close(); + } catch (IOException e) { + getLogger().error("Failed to close StreamThrottler", e); + } + } + final Server toShutdown = this.server; if (toShutdown == null) { return; @@ -185,6 +196,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); + throttlerRef.set(streamThrottler); final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 230790abcf..79d281512c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -351,6 +351,15 @@ public class PostHTTP extends AbstractProcessor { } configMap.clear(); + + final StreamThrottler throttler = throttlerRef.getAndSet(null); + if(throttler != null) { + try { + throttler.close(); + } catch (IOException e) { + getLogger().error("Failed to close StreamThrottler", e); + } + } } @OnScheduled diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index 52629648c1..edff3b4bb4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -261,7 +261,7 @@ public class TestPostHTTP { final String suppliedMimeType = "text/plain"; attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); - runner.enqueue("Camping is in tents.".getBytes(), attrs); + runner.enqueue("Camping is great!".getBytes(), attrs); runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false"); runner.run(1); @@ -269,7 +269,7 @@ public class TestPostHTTP { Map lastPostHeaders = servlet.getLastPostHeaders(); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); - Assert.assertEquals("20",lastPostHeaders.get("Content-Length")); + Assert.assertEquals("17",lastPostHeaders.get("Content-Length")); } @Test @@ -280,7 +280,7 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), ""); - runner.enqueue("The wilderness downtown.".getBytes(), attrs); + runner.enqueue("The wilderness.".getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -300,7 +300,7 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); - runner.enqueue("Try this trick and spin it.".getBytes(), attrs); + runner.enqueue("Sending with content type property.".getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -321,7 +321,7 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -346,7 +346,7 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -355,7 +355,7 @@ public class TestPostHTTP { Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); // Ensure that the request was not sent with a 'Content-Encoding' header Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); - Assert.assertEquals("6200",lastPostHeaders.get("Content-Length")); + Assert.assertEquals("2100",lastPostHeaders.get("Content-Length")); } @Test @@ -371,7 +371,7 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -394,17 +394,44 @@ public class TestPostHTTP { final Map attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); byte[] postValue = servlet.getLastPost(); - Assert.assertArrayEquals(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(),postValue); + Assert.assertArrayEquals(StringUtils.repeat("Lines of sample text.", 100).getBytes(),postValue); Map lastPostHeaders = servlet.getLastPostHeaders(); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertNull(lastPostHeaders.get("Content-Length")); Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding")); } + + @Test + public void testSendWithThrottler() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false"); + runner.setProperty(PostHTTP.MAX_DATA_RATE, "10kb"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + runner.enqueue(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(), attrs); + + boolean stopOnFinish = true; + runner.run(1, stopOnFinish); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + byte[] postValue = servlet.getLastPost(); + Assert.assertArrayEquals(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(),postValue); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + Assert.assertEquals("4700",lastPostHeaders.get("Content-Length")); + } }