From 528dab78d6dbc3bb4f61663f423b07284936ec40 Mon Sep 17 00:00:00 2001 From: Tony Kurc Date: Mon, 26 Oct 2015 19:41:26 -0400 Subject: [PATCH] NIFI-1073: Fixing coverity discovered errors. Resource leaks, and statics Reviewed by Bryan Bende (bbende@apache.org) --- .../nifi/io/socket/SSLContextFactory.java | 15 ++- .../stream/io/TestLeakyBucketThrottler.java | 104 +++++++++--------- .../processors/WriteResourceToStream.java | 10 +- .../manager/impl/WebClusterManager.java | 12 +- .../repository/FileSystemRepository.java | 9 +- .../repository/StandardProcessSession.java | 8 +- .../repository/VolatileContentRepository.java | 15 ++- .../apache/nifi/web/server/JettyServer.java | 65 +++++------ .../apache/nifi/web/api/dto/DtoFactory.java | 2 +- .../hadoop/TestCreateHadoopSequenceFile.java | 80 ++++++++------ .../processors/standard/AttributesToJSON.java | 2 +- .../processors/standard/ConvertJSONToSQL.java | 34 +++--- .../processors/standard/EncodeContent.java | 12 +- .../nifi/processors/standard/GetHTTP.java | 5 +- .../processors/standard/ListFileTransfer.java | 9 +- .../nifi/processors/standard/PostHTTP.java | 2 + .../nifi/processors/standard/SplitText.java | 2 +- .../util/OpenPGPKeyBasedEncryptor.java | 4 +- .../util/OpenPGPPasswordBasedEncryptor.java | 4 +- .../processors/standard/CaptureServlet.java | 12 +- .../nifi/processors/standard/TestGetHTTP.java | 2 +- .../processors/standard/TestTransformXml.java | 33 +++--- .../pom.xml | 4 + .../DistributedMapCacheClientService.java | 22 ++-- .../DistributedSetCacheClientService.java | 28 ++--- 25 files changed, 272 insertions(+), 223 deletions(-) diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java index 9c6cb82161..ee75500290 100644 --- a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java +++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java @@ -34,6 +34,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.file.FileUtils; public class SSLContextFactory { @@ -58,13 +59,23 @@ public class SSLContextFactory { // prepare the keystore final KeyStore keyStore = KeyStore.getInstance(keystoreType); - keyStore.load(new FileInputStream(keystore), keystorePass); + final FileInputStream keyStoreStream = new FileInputStream(keystore); + try { + keyStore.load(keyStoreStream, keystorePass); + } finally { + FileUtils.closeQuietly(keyStoreStream); + } final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); keyManagerFactory.init(keyStore, keystorePass); // prepare the truststore final KeyStore trustStore = KeyStore.getInstance(truststoreType); - trustStore.load(new FileInputStream(truststore), truststorePass); + final FileInputStream trustStoreStream = new FileInputStream(truststore); + try { + trustStore.load(trustStoreStream, truststorePass); + } finally { + FileUtils.closeQuietly(trustStoreStream); + } final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java index 52bd8dee49..e46577a614 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java @@ -41,75 +41,75 @@ public class TestLeakyBucketThrottler { final byte[] data = new byte[1024 * 1024 * 4]; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); + try (final OutputStream throttledOut = throttler.newThrottledOutputStream(baos)) { - final long start = System.currentTimeMillis(); - throttledOut.write(data); - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); + final long start = System.currentTimeMillis(); + throttledOut.write(data); + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + } } @Test(timeout = 10000) public void testInputStreamInterface() throws IOException { - // throttle rate at 1 MB/sec - final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); final byte[] data = new byte[1024 * 1024 * 4]; - final ByteArrayInputStream bais = new ByteArrayInputStream(data); - final InputStream throttledIn = throttler.newThrottledInputStream(bais); + // throttle rate at 1 MB/sec + try ( final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final InputStream throttledIn = throttler.newThrottledInputStream(bais); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[4096]; + final long start = System.currentTimeMillis(); + int len; + while ((len = throttledIn.read(buffer)) > 0) { + baos.write(buffer, 0, len); + } - final byte[] buffer = new byte[4096]; - final long start = System.currentTimeMillis(); - int len; - while ((len = throttledIn.read(buffer)) > 0) { - baos.write(buffer, 0, len); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); } - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); - baos.close(); } @Test(timeout = 10000) public void testDirectInterface() throws IOException, InterruptedException { // throttle rate at 1 MB/sec - final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + try (final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + // create 3 threads, each sending ~2 MB + final List threads = new ArrayList(); + for (int i = 0; i < 3; i++) { + final Thread t = new WriterThread(i, throttler, baos); + threads.add(t); + } - // create 3 threads, each sending ~2 MB - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final List threads = new ArrayList(); - for (int i = 0; i < 3; i++) { - final Thread t = new WriterThread(i, throttler, baos); - threads.add(t); + final long start = System.currentTimeMillis(); + for (final Thread t : threads) { + t.start(); + } + + for (final Thread t : threads) { + t.join(); + } + final long elapsed = System.currentTimeMillis() - start; + + throttler.close(); + + // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to + // allow for busy-ness and the fact that we could write a tiny bit more than the limit. + assertTrue(elapsed > 5000); + assertTrue(elapsed < 7000); + + // ensure bytes were copied out appropriately + assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); + assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); } - - final long start = System.currentTimeMillis(); - for (final Thread t : threads) { - t.start(); - } - - for (final Thread t : threads) { - t.join(); - } - final long elapsed = System.currentTimeMillis() - start; - - throttler.close(); - - // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to - // allow for busy-ness and the fact that we could write a tiny bit more than the limit. - assertTrue(elapsed > 5000); - assertTrue(elapsed < 7000); - - // ensure bytes were copied out appropriately - assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); - assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); } private static class WriterThread extends Thread { diff --git a/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java b/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java index 5d595b4417..55eb2d754a 100644 --- a/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java +++ b/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; import java.util.HashSet; @@ -57,13 +58,16 @@ public class WriteResourceToStream extends AbstractProcessor { relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); - + final InputStream resourceStream = Thread.currentThread() + .getContextClassLoader().getResourceAsStream("file.txt"); try { - this.resourceData = IOUtils.toString(Thread.currentThread() - .getContextClassLoader().getResourceAsStream("file.txt")); + this.resourceData = IOUtils.toString(resourceStream); } catch (IOException e) { throw new RuntimeException("Unable to load resources", e); + } finally { + IOUtils.closeQuietly(resourceStream); } + } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index c22e1ec478..5e9dd3c1ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -3402,12 +3402,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C @Override public void run() { try { - ((StreamingOutput) nodeResponse.getResponse().getEntity()).write( - new OutputStream() { - @Override - public void write(final int b) { /* drain response */ } - } - ); + try (final OutputStream drain = new OutputStream() { + @Override + public void write(final int b) { /* drain response */ } + }) { + ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(drain); + } } catch (final IOException | WebApplicationException ex) { logger.info("Failed clearing out non-client response buffer due to: " + ex, ex); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index b4a171690a..c72a19c00a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -58,6 +58,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -808,7 +809,13 @@ public class FileSystemRepository implements ContentRepository { final Path path = getPath(claim, true); final FileInputStream fis = new FileInputStream(path.toFile()); if (claim.getOffset() > 0L) { - StreamUtils.skip(fis, claim.getOffset()); + try { + StreamUtils.skip(fis, claim.getOffset()); + } catch(IOException ioe) { + IOUtils.closeQuietly(fis); + throw ioe; + } + } // see javadocs for claim.getLength() as to why we do this. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index f66894cb18..3709972917 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessorNode; @@ -1759,7 +1760,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return new DisableOnCloseInputStream(currentReadClaimStream); } else { final InputStream rawInStream = context.getContentRepository().read(claim); - StreamUtils.skip(rawInStream, offset); + try { + StreamUtils.skip(rawInStream, offset); + } catch(IOException ioe) { + IOUtils.closeQuietly(rawInStream); + throw ioe; + } return rawInStream; } } catch (final ContentNotFoundException cnfe) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 7c7cade19a..08b7e80a7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.IOUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; @@ -401,14 +402,22 @@ public class VolatileContentRepository implements ContentRepository { @Override public long exportTo(ContentClaim claim, OutputStream destination) throws IOException { final InputStream in = read(claim); - return StreamUtils.copy(in, destination); + try { + return StreamUtils.copy(in, destination); + } finally { + IOUtils.closeQuietly(in); + } } @Override public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException { final InputStream in = read(claim); - StreamUtils.skip(in, offset); - StreamUtils.copy(in, destination, length); + try { + StreamUtils.skip(in, offset); + StreamUtils.copy(in, destination, length); + } finally { + IOUtils.closeQuietly(in); + } return length; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index 99c11a87f6..ecfe2c0c68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -38,8 +38,10 @@ import java.util.Map; import java.util.Set; import java.util.jar.JarEntry; import java.util.jar.JarFile; + import javax.servlet.DispatcherType; import javax.servlet.ServletContext; + import org.apache.nifi.NiFiServer; import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.FlowSynchronizationException; @@ -51,6 +53,7 @@ import org.apache.nifi.services.FlowService; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiWebContext; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.web.ContentAccess; import org.apache.nifi.ui.extension.UiExtension; @@ -372,24 +375,25 @@ public class JettyServer implements NiFiServer { } // get an input stream for the nifi-processor configuration file - BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); + try (BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry)))) { - // read in each configured type - String rawComponentType; - while ((rawComponentType = in.readLine()) != null) { - // extract the component type - final String componentType = extractComponentType(rawComponentType); - if (componentType != null) { - List extensions = uiExtensions.get(uiExtensionType); + // read in each configured type + String rawComponentType; + while ((rawComponentType = in.readLine()) != null) { + // extract the component type + final String componentType = extractComponentType(rawComponentType); + if (componentType != null) { + List extensions = uiExtensions.get(uiExtensionType); - // if there are currently no extensions for this type create it - if (extensions == null) { - extensions = new ArrayList<>(); - uiExtensions.put(uiExtensionType, extensions); + // if there are currently no extensions for this type create it + if (extensions == null) { + extensions = new ArrayList<>(); + uiExtensions.put(uiExtensionType, extensions); + } + + // add the specified type + extensions.add(componentType); } - - // add the specified type - extensions.add(componentType); } } } @@ -437,37 +441,34 @@ public class JettyServer implements NiFiServer { */ private List getWarExtensions(final File war, final String path) { List processorTypes = new ArrayList<>(); + + // load the jar file and attempt to find the nifi-processor entry JarFile jarFile = null; try { - // load the jar file and attempt to find the nifi-processor entry jarFile = new JarFile(war); JarEntry jarEntry = jarFile.getJarEntry(path); // ensure the nifi-processor entry was found if (jarEntry != null) { // get an input stream for the nifi-processor configuration file - BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); + try (final BufferedReader in = new BufferedReader( + new InputStreamReader(jarFile.getInputStream(jarEntry)))) { - // read in each configured type - String rawProcessorType; - while ((rawProcessorType = in.readLine()) != null) { - // extract the processor type - final String processorType = extractComponentType(rawProcessorType); - if (processorType != null) { - processorTypes.add(processorType); + // read in each configured type + String rawProcessorType; + while ((rawProcessorType = in.readLine()) != null) { + // extract the processor type + final String processorType = extractComponentType(rawProcessorType); + if (processorType != null) { + processorTypes.add(processorType); + } } } } } catch (IOException ioe) { - logger.warn(String.format("Unable to inspect %s for a custom processor UI.", war)); + logger.warn("Unable to inspect {} for a custom processor UI.", new Object[]{war, ioe}); } finally { - try { - // close the jar file - which closes all input streams obtained via getInputStream above - if (jarFile != null) { - jarFile.close(); - } - } catch (IOException ioe) { - } + IOUtils.closeQuietly(jarFile); } return processorTypes; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index c71c0cf785..87277aefc2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -148,7 +148,7 @@ public final class DtoFactory { } }; - final int MAX_BULLETINS_PER_COMPONENT = 5; + final static int MAX_BULLETINS_PER_COMPONENT = 5; private ControllerServiceLookup controllerServiceLookup; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 430b4fa61f..5357dff705 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashMap; @@ -71,9 +70,11 @@ public class TestCreateHadoopSequenceFile { } @Test - public void testSimpleCase() throws FileNotFoundException { + public void testSimpleCase() throws IOException { for (File inFile : inFiles) { - controller.enqueue(new FileInputStream(inFile)); + try (FileInputStream fin = new FileInputStream(inFile) ) { + controller.enqueue(fin); + } } controller.run(3); @@ -88,7 +89,9 @@ public class TestCreateHadoopSequenceFile { @Test public void testSequenceFileSaysValueIsBytesWritable() throws UnsupportedEncodingException, IOException { for (File inFile : inFiles) { - controller.enqueue(new FileInputStream(inFile)); + try (FileInputStream fin = new FileInputStream(inFile) ){ + controller.enqueue(fin); + } } controller.run(3); @@ -118,35 +121,39 @@ public class TestCreateHadoopSequenceFile { } @Test - public void testMergedTarData() throws FileNotFoundException { + public void testMergedTarData() throws IOException { Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/tar"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545312236534130.tar"), attributes); - controller.run(); - List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try (final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545312236534130.tar")) { + controller.enqueue(fin, attributes); + controller.run(); + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } } @Test public void testMergedZipData() throws IOException { Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545423550275052.zip"), attributes); - controller.run(); - List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try (FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545423550275052.zip")){ + controller.enqueue(fin, attributes); + controller.run(); + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } // FileOutputStream fos = new FileOutputStream("zip-3-randoms.sf"); // fos.write(data); // fos.flush(); @@ -157,16 +164,19 @@ public class TestCreateHadoopSequenceFile { public void testMergedFlowfilePackagedData() throws IOException { Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545479542069498.pkg"), attributes); - controller.run(); - List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try ( final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545479542069498.pkg")) { + controller.enqueue(fin, attributes); + + controller.run(); + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } // FileOutputStream fos = new FileOutputStream("flowfilePkg-3-randoms.sf"); // fos.write(data); // fos.flush(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index 0ab3ca758f..78b8d5843b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor { public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; public static final String DESTINATION_CONTENT = "flowfile-content"; - private final String APPLICATION_JSON = "application/json"; + private static final String APPLICATION_JSON = "application/json"; public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 95919609cc..6f14800709 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -561,25 +561,27 @@ public class ConvertJSONToSQL extends AbstractProcessor { public static TableSchema from(final Connection conn, final String catalog, final String tableName, final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); + try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) { - final List cols = new ArrayList<>(); - while (colrs.next()) { - final ColumnDescription col = ColumnDescription.from(colrs); - cols.add(col); - } - - final Set primaryKeyColumns = new HashSet<>(); - if (includePrimaryKeys) { - final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); - - while (pkrs.next()) { - final String colName = pkrs.getString("COLUMN_NAME"); - primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + final List cols = new ArrayList<>(); + while (colrs.next()) { + final ColumnDescription col = ColumnDescription.from(colrs); + cols.add(col); } - } - return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + final Set primaryKeyColumns = new HashSet<>(); + if (includePrimaryKeys) { + try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) { + + while (pkrs.next()) { + final String colName = pkrs.getString("COLUMN_NAME"); + primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + } + } + } + + return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java index de81fe5ce7..d1e010d9c5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java @@ -171,7 +171,7 @@ public class EncodeContent extends AbstractProcessor { } } - private class EncodeBase64 implements StreamCallback { + private static class EncodeBase64 implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -181,7 +181,7 @@ public class EncodeContent extends AbstractProcessor { } } - private class DecodeBase64 implements StreamCallback { + private static class DecodeBase64 implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -191,7 +191,7 @@ public class EncodeContent extends AbstractProcessor { } } - private class EncodeBase32 implements StreamCallback { + private static class EncodeBase32 implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -201,7 +201,7 @@ public class EncodeContent extends AbstractProcessor { } } - private class DecodeBase32 implements StreamCallback { + private static class DecodeBase32 implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -213,7 +213,7 @@ public class EncodeContent extends AbstractProcessor { private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; - private class EncodeHex implements StreamCallback { + private static class EncodeHex implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -231,7 +231,7 @@ public class EncodeContent extends AbstractProcessor { } } - private class DecodeHex implements StreamCallback { + private static class DecodeHex implements StreamCallback { @Override public void process(InputStream in, OutputStream out) throws IOException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 224508059a..5eab704e81 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -56,7 +56,6 @@ import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.config.Registry; @@ -67,6 +66,7 @@ import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.ssl.SSLContextBuilder; @@ -438,7 +438,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } // create the http client - final HttpClient client = clientBuilder.build(); + final CloseableHttpClient client = clientBuilder.build(); // create request final HttpGet get = new HttpGet(url); @@ -539,7 +539,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t); throw t; } - } finally { conMan.shutdown(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index b6c8c286bf..2cc3aae383 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -92,7 +93,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor { @Override protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { final FileTransfer transfer = getFileTransfer(context); - final List listing = transfer.getListing(); + final List listing; + try { + listing = transfer.getListing(); + } finally { + IOUtils.closeQuietly(transfer); + } + if (minTimestamp == null) { return listing; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index ef84629f49..c830ac0d9d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -49,6 +49,7 @@ import javax.net.ssl.SSLSession; import javax.security.cert.X509Certificate; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.io.IOUtils; import org.apache.http.Header; import org.apache.http.HttpException; import org.apache.http.HttpHost; @@ -637,6 +638,7 @@ public class PostHTTP extends AbstractProcessor { + "configured to deliver FlowFiles; rolling back session", new Object[]{url}); session.rollback(); context.yield(); + IOUtils.closeQuietly(client); return; } } else { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index e966880c15..678919c89f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -353,7 +353,7 @@ public class SplitText extends AbstractProcessor { } } - private class SplitInfo { + private static class SplitInfo { public long offsetBytes; public long lengthBytes; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java index f41d603e73..b91a529427 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java @@ -134,7 +134,7 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { return null; } - private class OpenPGPDecryptCallback implements StreamCallback { + private static class OpenPGPDecryptCallback implements StreamCallback { private String provider; private String secretKeyring; @@ -216,7 +216,7 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor { } - private class OpenPGPEncryptCallback implements StreamCallback { + private static class OpenPGPEncryptCallback implements StreamCallback { private String algorithm; private String provider; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java index 3e870df662..b09d44448a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java @@ -65,7 +65,7 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor { return new OpenPGPDecryptCallback(provider, password); } - private class OpenPGPDecryptCallback implements StreamCallback { + private static class OpenPGPDecryptCallback implements StreamCallback { private String provider; private char[] password; @@ -120,7 +120,7 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor { } - private class OpenPGPEncryptCallback implements StreamCallback { + private static class OpenPGPEncryptCallback implements StreamCallback { private String algorithm; private String provider; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java index d6c87d69c8..a1398f4ade 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java @@ -24,8 +24,9 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response.Status; -import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.file.FileUtils; public class CaptureServlet extends HttpServlet { @@ -40,9 +41,12 @@ public class CaptureServlet extends HttpServlet { @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - StreamUtils.copy(request.getInputStream(), baos); - this.lastPost = baos.toByteArray(); - + try{ + StreamUtils.copy(request.getInputStream(), baos); + this.lastPost = baos.toByteArray(); + } finally{ + FileUtils.closeQuietly(baos); + } response.setStatus(Status.OK.getStatusCode()); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java index 29ce4299f5..5aec796788 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java @@ -375,7 +375,7 @@ public class TestGetHTTP { // Use context service with a keystore and a truststore useSSLContextService(twoWaySslProperties); - controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "10 secs"); controller.setProperty(GetHTTP.URL, destination); controller.setProperty(GetHTTP.FILENAME, "testFile"); controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java index 7074ec95e4..2dbf09f8fc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java @@ -20,7 +20,6 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; @@ -93,23 +92,25 @@ public class TestTransformXml { StringBuilder builder = new StringBuilder(); builder.append("\n"); - InputStream in = new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv")); - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + try(BufferedReader reader = new BufferedReader(new InputStreamReader( + new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv"))))){ - String line = null; - while ((line = reader.readLine()) != null) { - builder.append(line).append("\n"); + + String line = null; + while ((line = reader.readLine()) != null) { + builder.append(line).append("\n"); + } + builder.append(""); + String data = builder.toString(); + runner.enqueue(data.getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0); + final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1); + + transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml")); } - builder.append(""); - String data = builder.toString(); - runner.enqueue(data.getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS); - final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0); - final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1); - - transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml")); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml index e05f4dc2a0..e335b1ea10 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml @@ -47,5 +47,9 @@ org.apache.nifi nifi-ssl-context-service-api + + commons-io + commons-io + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index c03dd5afc6..e78ae0b1f9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -298,27 +299,18 @@ public class DistributedMapCacheClientService extends AbstractControllerService if (closed) { throw new IllegalStateException("Client is closed"); } - + boolean tryToRequeue = true; final CommsSession session = leaseCommsSession(); try { return action.execute(session); } catch (final IOException ioe) { - try { - session.close(); - } catch (final IOException ignored) { - } - + tryToRequeue = false; throw ioe; } finally { - if (!session.isClosed()) { - if (this.closed) { - try { - session.close(); - } catch (final IOException ioe) { - } - } else { - queue.offer(session); - } + if (tryToRequeue == true && this.closed == false) { + queue.offer(session); + } else { + IOUtils.closeQuietly(session); } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index 8c95c77f54..82ab643f8f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -127,10 +128,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService try { ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); } catch (final HandshakeException e) { - try { - session.close(); - } catch (final IOException ioe) { - } + IOUtils.closeQuietly(session); throw new IOException(e); } @@ -162,9 +160,9 @@ public class DistributedSetCacheClientService extends AbstractControllerService try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { dos.writeUTF("close"); dos.flush(); - commsSession.close(); } catch (final IOException e) { } + IOUtils.closeQuietly(commsSession); } if (logger.isDebugEnabled() && getIdentifier() != null) { logger.debug("Closed {}", new Object[]{getIdentifier()}); @@ -185,6 +183,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService } final CommsSession session = leaseCommsSession(); + boolean tryToRequeue = true; try { final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); dos.writeUTF(methodName); @@ -198,22 +197,13 @@ public class DistributedSetCacheClientService extends AbstractControllerService final DataInputStream dis = new DataInputStream(session.getInputStream()); return dis.readBoolean(); } catch (final IOException ioe) { - try { - session.close(); - } catch (final IOException ignored) { - } - + tryToRequeue = false; throw ioe; } finally { - if (!session.isClosed()) { - if (this.closed) { - try { - session.close(); - } catch (final IOException ioe) { - } - } else { - queue.offer(session); - } + if (tryToRequeue == true && this.closed == false) { + queue.offer(session); + } else { + IOUtils.closeQuietly(session); } } }