NIFI-1865 Close StreamThrottler when processor stops.

- Also, replaced copyrighted sample texts for tests

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Koji Kawamura 2016-05-11 08:08:58 +09:00 committed by Matt Burgess
parent 1370eefdd7
commit 7bd2c64adb
3 changed files with 58 additions and 10 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -136,6 +137,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
private volatile Server server = null; private volatile Server server = null;
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>(); private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>();
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -166,6 +168,15 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
@OnStopped @OnStopped
public void shutdownHttpServer() { public void shutdownHttpServer() {
final StreamThrottler throttler = throttlerRef.getAndSet(null);
if(throttler != null) {
try {
throttler.close();
} catch (IOException e) {
getLogger().error("Failed to close StreamThrottler", e);
}
}
final Server toShutdown = this.server; final Server toShutdown = this.server;
if (toShutdown == null) { if (toShutdown == null) {
return; return;
@ -185,6 +196,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
throttlerRef.set(streamThrottler);
final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null; final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null;

View File

@ -351,6 +351,15 @@ public class PostHTTP extends AbstractProcessor {
} }
configMap.clear(); configMap.clear();
final StreamThrottler throttler = throttlerRef.getAndSet(null);
if(throttler != null) {
try {
throttler.close();
} catch (IOException e) {
getLogger().error("Failed to close StreamThrottler", e);
}
}
} }
@OnScheduled @OnScheduled

View File

@ -261,7 +261,7 @@ public class TestPostHTTP {
final String suppliedMimeType = "text/plain"; final String suppliedMimeType = "text/plain";
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Camping is in tents.".getBytes(), attrs); runner.enqueue("Camping is great!".getBytes(), attrs);
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false"); runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.run(1); runner.run(1);
@ -269,7 +269,7 @@ public class TestPostHTTP {
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertEquals("20",lastPostHeaders.get("Content-Length")); Assert.assertEquals("17",lastPostHeaders.get("Content-Length"));
} }
@Test @Test
@ -280,7 +280,7 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), ""); attrs.put(CoreAttributes.MIME_TYPE.key(), "");
runner.enqueue("The wilderness downtown.".getBytes(), attrs); runner.enqueue("The wilderness.".getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@ -300,7 +300,7 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
runner.enqueue("Try this trick and spin it.".getBytes(), attrs); runner.enqueue("Sending with content type property.".getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@ -321,7 +321,7 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@ -346,7 +346,7 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@ -355,7 +355,7 @@ public class TestPostHTTP {
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header // Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
Assert.assertEquals("6200",lastPostHeaders.get("Content-Length")); Assert.assertEquals("2100",lastPostHeaders.get("Content-Length"));
} }
@Test @Test
@ -371,7 +371,7 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@ -394,17 +394,44 @@ public class TestPostHTTP {
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
byte[] postValue = servlet.getLastPost(); byte[] postValue = servlet.getLastPost();
Assert.assertArrayEquals(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(),postValue); Assert.assertArrayEquals(StringUtils.repeat("Lines of sample text.", 100).getBytes(),postValue);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertNull(lastPostHeaders.get("Content-Length")); Assert.assertNull(lastPostHeaders.get("Content-Length"));
Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding")); Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding"));
} }
@Test
public void testSendWithThrottler() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.setProperty(PostHTTP.MAX_DATA_RATE, "10kb");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(), attrs);
boolean stopOnFinish = true;
runner.run(1, stopOnFinish);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
byte[] postValue = servlet.getLastPost();
Assert.assertArrayEquals(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(),postValue);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertEquals("4700",lastPostHeaders.get("Content-Length"));
}
} }