NIFI-1073: Fixing coverity discovered errors. Resource leaks, and statics

Reviewed by Bryan Bende (bbende@apache.org)
This commit is contained in:
Tony Kurc 2015-10-26 19:41:26 -04:00
parent 01539ed323
commit 528dab78d6
25 changed files with 272 additions and 223 deletions

View File

@ -34,6 +34,7 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
public class SSLContextFactory { public class SSLContextFactory {
@ -58,13 +59,23 @@ public class SSLContextFactory {
// prepare the keystore // prepare the keystore
final KeyStore keyStore = KeyStore.getInstance(keystoreType); final KeyStore keyStore = KeyStore.getInstance(keystoreType);
keyStore.load(new FileInputStream(keystore), keystorePass); final FileInputStream keyStoreStream = new FileInputStream(keystore);
try {
keyStore.load(keyStoreStream, keystorePass);
} finally {
FileUtils.closeQuietly(keyStoreStream);
}
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keystorePass); keyManagerFactory.init(keyStore, keystorePass);
// prepare the truststore // prepare the truststore
final KeyStore trustStore = KeyStore.getInstance(truststoreType); final KeyStore trustStore = KeyStore.getInstance(truststoreType);
trustStore.load(new FileInputStream(truststore), truststorePass); final FileInputStream trustStoreStream = new FileInputStream(truststore);
try {
trustStore.load(trustStoreStream, truststorePass);
} finally {
FileUtils.closeQuietly(trustStoreStream);
}
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore); trustManagerFactory.init(trustStore);

View File

@ -41,75 +41,75 @@ public class TestLeakyBucketThrottler {
final byte[] data = new byte[1024 * 1024 * 4]; final byte[] data = new byte[1024 * 1024 * 4];
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); try (final OutputStream throttledOut = throttler.newThrottledOutputStream(baos)) {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
throttledOut.write(data); throttledOut.write(data);
throttler.close(); throttler.close();
final long millis = System.currentTimeMillis() - start; final long millis = System.currentTimeMillis() - start;
// should take 4 sec give or take // should take 4 sec give or take
assertTrue(millis > 3000); assertTrue(millis > 3000);
assertTrue(millis < 6000); assertTrue(millis < 6000);
}
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testInputStreamInterface() throws IOException { public void testInputStreamInterface() throws IOException {
// throttle rate at 1 MB/sec
final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final byte[] data = new byte[1024 * 1024 * 4]; final byte[] data = new byte[1024 * 1024 * 4];
final ByteArrayInputStream bais = new ByteArrayInputStream(data); // throttle rate at 1 MB/sec
final InputStream throttledIn = throttler.newThrottledInputStream(bais); try ( final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final ByteArrayInputStream bais = new ByteArrayInputStream(data);
final InputStream throttledIn = throttler.newThrottledInputStream(bais);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final byte[] buffer = new byte[4096];
final long start = System.currentTimeMillis();
int len;
while ((len = throttledIn.read(buffer)) > 0) {
baos.write(buffer, 0, len);
}
final byte[] buffer = new byte[4096]; final long millis = System.currentTimeMillis() - start;
final long start = System.currentTimeMillis(); // should take 4 sec give or take
int len; assertTrue(millis > 3000);
while ((len = throttledIn.read(buffer)) > 0) { assertTrue(millis < 6000);
baos.write(buffer, 0, len);
} }
throttler.close();
final long millis = System.currentTimeMillis() - start;
// should take 4 sec give or take
assertTrue(millis > 3000);
assertTrue(millis < 6000);
baos.close();
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testDirectInterface() throws IOException, InterruptedException { public void testDirectInterface() throws IOException, InterruptedException {
// throttle rate at 1 MB/sec // throttle rate at 1 MB/sec
final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); try (final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
// create 3 threads, each sending ~2 MB
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 3; i++) {
final Thread t = new WriterThread(i, throttler, baos);
threads.add(t);
}
// create 3 threads, each sending ~2 MB final long start = System.currentTimeMillis();
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); for (final Thread t : threads) {
final List<Thread> threads = new ArrayList<Thread>(); t.start();
for (int i = 0; i < 3; i++) { }
final Thread t = new WriterThread(i, throttler, baos);
threads.add(t); for (final Thread t : threads) {
t.join();
}
final long elapsed = System.currentTimeMillis() - start;
throttler.close();
// To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
// allow for busy-ness and the fact that we could write a tiny bit more than the limit.
assertTrue(elapsed > 5000);
assertTrue(elapsed < 7000);
// ensure bytes were copied out appropriately
assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
} }
final long start = System.currentTimeMillis();
for (final Thread t : threads) {
t.start();
}
for (final Thread t : threads) {
t.join();
}
final long elapsed = System.currentTimeMillis() - start;
throttler.close();
// To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
// allow for busy-ness and the fact that we could write a tiny bit more than the limit.
assertTrue(elapsed > 5000);
assertTrue(elapsed < 7000);
// ensure bytes were copied out appropriately
assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
} }
private static class WriterThread extends Thread { private static class WriterThread extends Thread {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors; package org.apache.nifi.processors;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -57,13 +58,16 @@ public class WriteResourceToStream extends AbstractProcessor {
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE); relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships); this.relationships = Collections.unmodifiableSet(relationships);
final InputStream resourceStream = Thread.currentThread()
.getContextClassLoader().getResourceAsStream("file.txt");
try { try {
this.resourceData = IOUtils.toString(Thread.currentThread() this.resourceData = IOUtils.toString(resourceStream);
.getContextClassLoader().getResourceAsStream("file.txt"));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Unable to load resources", e); throw new RuntimeException("Unable to load resources", e);
} finally {
IOUtils.closeQuietly(resourceStream);
} }
} }
@Override @Override

View File

@ -3402,12 +3402,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override @Override
public void run() { public void run() {
try { try {
((StreamingOutput) nodeResponse.getResponse().getEntity()).write( try (final OutputStream drain = new OutputStream() {
new OutputStream() { @Override
@Override public void write(final int b) { /* drain response */ }
public void write(final int b) { /* drain response */ } }) {
} ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(drain);
); }
} catch (final IOException | WebApplicationException ex) { } catch (final IOException | WebApplicationException ex) {
logger.info("Failed clearing out non-client response buffer due to: " + ex, ex); logger.info("Failed clearing out non-client response buffer due to: " + ex, ex);
} }

View File

@ -58,6 +58,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
@ -808,7 +809,13 @@ public class FileSystemRepository implements ContentRepository {
final Path path = getPath(claim, true); final Path path = getPath(claim, true);
final FileInputStream fis = new FileInputStream(path.toFile()); final FileInputStream fis = new FileInputStream(path.toFile());
if (claim.getOffset() > 0L) { if (claim.getOffset() > 0L) {
StreamUtils.skip(fis, claim.getOffset()); try {
StreamUtils.skip(fis, claim.getOffset());
} catch(IOException ioe) {
IOUtils.closeQuietly(fis);
throw ioe;
}
} }
// see javadocs for claim.getLength() as to why we do this. // see javadocs for claim.getLength() as to why we do this.

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
@ -1759,7 +1760,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return new DisableOnCloseInputStream(currentReadClaimStream); return new DisableOnCloseInputStream(currentReadClaimStream);
} else { } else {
final InputStream rawInStream = context.getContentRepository().read(claim); final InputStream rawInStream = context.getContentRepository().read(claim);
StreamUtils.skip(rawInStream, offset); try {
StreamUtils.skip(rawInStream, offset);
} catch(IOException ioe) {
IOUtils.closeQuietly(rawInStream);
throw ioe;
}
return rawInStream; return rawInStream;
} }
} catch (final ContentNotFoundException cnfe) { } catch (final ContentNotFoundException cnfe) {

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@ -401,14 +402,22 @@ public class VolatileContentRepository implements ContentRepository {
@Override @Override
public long exportTo(ContentClaim claim, OutputStream destination) throws IOException { public long exportTo(ContentClaim claim, OutputStream destination) throws IOException {
final InputStream in = read(claim); final InputStream in = read(claim);
return StreamUtils.copy(in, destination); try {
return StreamUtils.copy(in, destination);
} finally {
IOUtils.closeQuietly(in);
}
} }
@Override @Override
public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException { public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException {
final InputStream in = read(claim); final InputStream in = read(claim);
StreamUtils.skip(in, offset); try {
StreamUtils.copy(in, destination, length); StreamUtils.skip(in, offset);
StreamUtils.copy(in, destination, length);
} finally {
IOUtils.closeQuietly(in);
}
return length; return length;
} }

View File

@ -38,8 +38,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.jar.JarEntry; import java.util.jar.JarEntry;
import java.util.jar.JarFile; import java.util.jar.JarFile;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import org.apache.nifi.NiFiServer; import org.apache.nifi.NiFiServer;
import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException; import org.apache.nifi.controller.FlowSynchronizationException;
@ -51,6 +53,7 @@ import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiWebContext; import org.apache.nifi.web.NiFiWebContext;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.web.ContentAccess; import org.apache.nifi.web.ContentAccess;
import org.apache.nifi.ui.extension.UiExtension; import org.apache.nifi.ui.extension.UiExtension;
@ -372,24 +375,25 @@ public class JettyServer implements NiFiServer {
} }
// get an input stream for the nifi-processor configuration file // get an input stream for the nifi-processor configuration file
BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); try (BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry)))) {
// read in each configured type // read in each configured type
String rawComponentType; String rawComponentType;
while ((rawComponentType = in.readLine()) != null) { while ((rawComponentType = in.readLine()) != null) {
// extract the component type // extract the component type
final String componentType = extractComponentType(rawComponentType); final String componentType = extractComponentType(rawComponentType);
if (componentType != null) { if (componentType != null) {
List<String> extensions = uiExtensions.get(uiExtensionType); List<String> extensions = uiExtensions.get(uiExtensionType);
// if there are currently no extensions for this type create it // if there are currently no extensions for this type create it
if (extensions == null) { if (extensions == null) {
extensions = new ArrayList<>(); extensions = new ArrayList<>();
uiExtensions.put(uiExtensionType, extensions); uiExtensions.put(uiExtensionType, extensions);
}
// add the specified type
extensions.add(componentType);
} }
// add the specified type
extensions.add(componentType);
} }
} }
} }
@ -437,37 +441,34 @@ public class JettyServer implements NiFiServer {
*/ */
private List<String> getWarExtensions(final File war, final String path) { private List<String> getWarExtensions(final File war, final String path) {
List<String> processorTypes = new ArrayList<>(); List<String> processorTypes = new ArrayList<>();
// load the jar file and attempt to find the nifi-processor entry
JarFile jarFile = null; JarFile jarFile = null;
try { try {
// load the jar file and attempt to find the nifi-processor entry
jarFile = new JarFile(war); jarFile = new JarFile(war);
JarEntry jarEntry = jarFile.getJarEntry(path); JarEntry jarEntry = jarFile.getJarEntry(path);
// ensure the nifi-processor entry was found // ensure the nifi-processor entry was found
if (jarEntry != null) { if (jarEntry != null) {
// get an input stream for the nifi-processor configuration file // get an input stream for the nifi-processor configuration file
BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); try (final BufferedReader in = new BufferedReader(
new InputStreamReader(jarFile.getInputStream(jarEntry)))) {
// read in each configured type // read in each configured type
String rawProcessorType; String rawProcessorType;
while ((rawProcessorType = in.readLine()) != null) { while ((rawProcessorType = in.readLine()) != null) {
// extract the processor type // extract the processor type
final String processorType = extractComponentType(rawProcessorType); final String processorType = extractComponentType(rawProcessorType);
if (processorType != null) { if (processorType != null) {
processorTypes.add(processorType); processorTypes.add(processorType);
}
} }
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
logger.warn(String.format("Unable to inspect %s for a custom processor UI.", war)); logger.warn("Unable to inspect {} for a custom processor UI.", new Object[]{war, ioe});
} finally { } finally {
try { IOUtils.closeQuietly(jarFile);
// close the jar file - which closes all input streams obtained via getInputStream above
if (jarFile != null) {
jarFile.close();
}
} catch (IOException ioe) {
}
} }
return processorTypes; return processorTypes;

View File

@ -148,7 +148,7 @@ public final class DtoFactory {
} }
}; };
final int MAX_BULLETINS_PER_COMPONENT = 5; final static int MAX_BULLETINS_PER_COMPONENT = 5;
private ControllerServiceLookup controllerServiceLookup; private ControllerServiceLookup controllerServiceLookup;

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.HashMap; import java.util.HashMap;
@ -71,9 +70,11 @@ public class TestCreateHadoopSequenceFile {
} }
@Test @Test
public void testSimpleCase() throws FileNotFoundException { public void testSimpleCase() throws IOException {
for (File inFile : inFiles) { for (File inFile : inFiles) {
controller.enqueue(new FileInputStream(inFile)); try (FileInputStream fin = new FileInputStream(inFile) ) {
controller.enqueue(fin);
}
} }
controller.run(3); controller.run(3);
@ -88,7 +89,9 @@ public class TestCreateHadoopSequenceFile {
@Test @Test
public void testSequenceFileSaysValueIsBytesWritable() throws UnsupportedEncodingException, IOException { public void testSequenceFileSaysValueIsBytesWritable() throws UnsupportedEncodingException, IOException {
for (File inFile : inFiles) { for (File inFile : inFiles) {
controller.enqueue(new FileInputStream(inFile)); try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
} }
controller.run(3); controller.run(3);
@ -118,35 +121,39 @@ public class TestCreateHadoopSequenceFile {
} }
@Test @Test
public void testMergedTarData() throws FileNotFoundException { public void testMergedTarData() throws IOException {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/tar"); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/tar");
controller.enqueue(new FileInputStream("src/test/resources/testdata/13545312236534130.tar"), attributes); try (final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545312236534130.tar")) {
controller.run(); controller.enqueue(fin, attributes);
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); controller.run();
assertEquals(1, successSeqFiles.size()); List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
final byte[] data = successSeqFiles.iterator().next().toByteArray(); assertEquals(1, successSeqFiles.size());
// Data should be greater than 1000000 because that's the size of 2 of our input files, final byte[] data = successSeqFiles.iterator().next().toByteArray();
// and the file size should contain all of that plus headers, but the headers should only // Data should be greater than 1000000 because that's the size of 2 of our input files,
// be a couple hundred bytes. // and the file size should contain all of that plus headers, but the headers should only
assertTrue(data.length > 1000000); // be a couple hundred bytes.
assertTrue(data.length < 1501000); assertTrue(data.length > 1000000);
assertTrue(data.length < 1501000);
}
} }
@Test @Test
public void testMergedZipData() throws IOException { public void testMergedZipData() throws IOException {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip"); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
controller.enqueue(new FileInputStream("src/test/resources/testdata/13545423550275052.zip"), attributes); try (FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545423550275052.zip")){
controller.run(); controller.enqueue(fin, attributes);
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); controller.run();
assertEquals(1, successSeqFiles.size()); List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
final byte[] data = successSeqFiles.iterator().next().toByteArray(); assertEquals(1, successSeqFiles.size());
// Data should be greater than 1000000 because that's the size of 2 of our input files, final byte[] data = successSeqFiles.iterator().next().toByteArray();
// and the file size should contain all of that plus headers, but the headers should only // Data should be greater than 1000000 because that's the size of 2 of our input files,
// be a couple hundred bytes. // and the file size should contain all of that plus headers, but the headers should only
assertTrue(data.length > 1000000); // be a couple hundred bytes.
assertTrue(data.length < 1501000); assertTrue(data.length > 1000000);
assertTrue(data.length < 1501000);
}
// FileOutputStream fos = new FileOutputStream("zip-3-randoms.sf"); // FileOutputStream fos = new FileOutputStream("zip-3-randoms.sf");
// fos.write(data); // fos.write(data);
// fos.flush(); // fos.flush();
@ -157,16 +164,19 @@ public class TestCreateHadoopSequenceFile {
public void testMergedFlowfilePackagedData() throws IOException { public void testMergedFlowfilePackagedData() throws IOException {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3"); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3");
controller.enqueue(new FileInputStream("src/test/resources/testdata/13545479542069498.pkg"), attributes); try ( final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545479542069498.pkg")) {
controller.run(); controller.enqueue(fin, attributes);
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
assertEquals(1, successSeqFiles.size()); controller.run();
final byte[] data = successSeqFiles.iterator().next().toByteArray(); List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
// Data should be greater than 1000000 because that's the size of 2 of our input files, assertEquals(1, successSeqFiles.size());
// and the file size should contain all of that plus headers, but the headers should only final byte[] data = successSeqFiles.iterator().next().toByteArray();
// be a couple hundred bytes. // Data should be greater than 1000000 because that's the size of 2 of our input files,
assertTrue(data.length > 1000000); // and the file size should contain all of that plus headers, but the headers should only
assertTrue(data.length < 1501000); // be a couple hundred bytes.
assertTrue(data.length > 1000000);
assertTrue(data.length < 1501000);
}
// FileOutputStream fos = new FileOutputStream("flowfilePkg-3-randoms.sf"); // FileOutputStream fos = new FileOutputStream("flowfilePkg-3-randoms.sf");
// fos.write(data); // fos.write(data);
// fos.flush(); // fos.flush();

View File

@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content"; public static final String DESTINATION_CONTENT = "flowfile-content";
private final String APPLICATION_JSON = "application/json"; private static final String APPLICATION_JSON = "application/json";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()

View File

@ -561,25 +561,27 @@ public class ConvertJSONToSQL extends AbstractProcessor {
public static TableSchema from(final Connection conn, final String catalog, final String tableName, public static TableSchema from(final Connection conn, final String catalog, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>(); final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) { while (colrs.next()) {
final ColumnDescription col = ColumnDescription.from(colrs); final ColumnDescription col = ColumnDescription.from(colrs);
cols.add(col); cols.add(col);
}
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
} }
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns); final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
}
} }
} }

View File

@ -171,7 +171,7 @@ public class EncodeContent extends AbstractProcessor {
} }
} }
private class EncodeBase64 implements StreamCallback { private static class EncodeBase64 implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
@ -181,7 +181,7 @@ public class EncodeContent extends AbstractProcessor {
} }
} }
private class DecodeBase64 implements StreamCallback { private static class DecodeBase64 implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
@ -191,7 +191,7 @@ public class EncodeContent extends AbstractProcessor {
} }
} }
private class EncodeBase32 implements StreamCallback { private static class EncodeBase32 implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
@ -201,7 +201,7 @@ public class EncodeContent extends AbstractProcessor {
} }
} }
private class DecodeBase32 implements StreamCallback { private static class DecodeBase32 implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
@ -213,7 +213,7 @@ public class EncodeContent extends AbstractProcessor {
private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
private class EncodeHex implements StreamCallback { private static class EncodeHex implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
@ -231,7 +231,7 @@ public class EncodeContent extends AbstractProcessor {
} }
} }
private class DecodeHex implements StreamCallback { private static class DecodeHex implements StreamCallback {
@Override @Override
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {

View File

@ -56,7 +56,6 @@ import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.Registry; import org.apache.http.config.Registry;
@ -67,6 +66,7 @@ import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
@ -438,7 +438,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
} }
// create the http client // create the http client
final HttpClient client = clientBuilder.build(); final CloseableHttpClient client = clientBuilder.build();
// create request // create request
final HttpGet get = new HttpGet(url); final HttpGet get = new HttpGet(url);
@ -539,7 +539,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t); logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t);
throw t; throw t;
} }
} finally { } finally {
conMan.shutdown(); conMan.shutdown();
} }

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -92,7 +93,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
@Override @Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final FileTransfer transfer = getFileTransfer(context); final FileTransfer transfer = getFileTransfer(context);
final List<FileInfo> listing = transfer.getListing(); final List<FileInfo> listing;
try {
listing = transfer.getListing();
} finally {
IOUtils.closeQuietly(transfer);
}
if (minTimestamp == null) { if (minTimestamp == null) {
return listing; return listing;
} }

View File

@ -49,6 +49,7 @@ import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate; import javax.security.cert.X509Certificate;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header; import org.apache.http.Header;
import org.apache.http.HttpException; import org.apache.http.HttpException;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
@ -637,6 +638,7 @@ public class PostHTTP extends AbstractProcessor {
+ "configured to deliver FlowFiles; rolling back session", new Object[]{url}); + "configured to deliver FlowFiles; rolling back session", new Object[]{url});
session.rollback(); session.rollback();
context.yield(); context.yield();
IOUtils.closeQuietly(client);
return; return;
} }
} else { } else {

View File

@ -353,7 +353,7 @@ public class SplitText extends AbstractProcessor {
} }
} }
private class SplitInfo { private static class SplitInfo {
public long offsetBytes; public long offsetBytes;
public long lengthBytes; public long lengthBytes;

View File

@ -134,7 +134,7 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor {
return null; return null;
} }
private class OpenPGPDecryptCallback implements StreamCallback { private static class OpenPGPDecryptCallback implements StreamCallback {
private String provider; private String provider;
private String secretKeyring; private String secretKeyring;
@ -216,7 +216,7 @@ public class OpenPGPKeyBasedEncryptor implements Encryptor {
} }
private class OpenPGPEncryptCallback implements StreamCallback { private static class OpenPGPEncryptCallback implements StreamCallback {
private String algorithm; private String algorithm;
private String provider; private String provider;

View File

@ -65,7 +65,7 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor {
return new OpenPGPDecryptCallback(provider, password); return new OpenPGPDecryptCallback(provider, password);
} }
private class OpenPGPDecryptCallback implements StreamCallback { private static class OpenPGPDecryptCallback implements StreamCallback {
private String provider; private String provider;
private char[] password; private char[] password;
@ -120,7 +120,7 @@ public class OpenPGPPasswordBasedEncryptor implements Encryptor {
} }
private class OpenPGPEncryptCallback implements StreamCallback { private static class OpenPGPEncryptCallback implements StreamCallback {
private String algorithm; private String algorithm;
private String provider; private String provider;

View File

@ -24,8 +24,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.file.FileUtils;
public class CaptureServlet extends HttpServlet { public class CaptureServlet extends HttpServlet {
@ -40,9 +41,12 @@ public class CaptureServlet extends HttpServlet {
@Override @Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
StreamUtils.copy(request.getInputStream(), baos); try{
this.lastPost = baos.toByteArray(); StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally{
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode()); response.setStatus(Status.OK.getStatusCode());
} }

View File

@ -375,7 +375,7 @@ public class TestGetHTTP {
// Use context service with a keystore and a truststore // Use context service with a keystore and a truststore
useSSLContextService(twoWaySslProperties); useSSLContextService(twoWaySslProperties);
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "10 secs");
controller.setProperty(GetHTTP.URL, destination); controller.setProperty(GetHTTP.URL, destination);
controller.setProperty(GetHTTP.FILENAME, "testFile"); controller.setProperty(GetHTTP.FILENAME, "testFile");
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");

View File

@ -20,7 +20,6 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -93,23 +92,25 @@ public class TestTransformXml {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("<data>\n"); builder.append("<data>\n");
InputStream in = new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv")); try(BufferedReader reader = new BufferedReader(new InputStreamReader(
BufferedReader reader = new BufferedReader(new InputStreamReader(in)); new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv"))))){
String line = null;
while ((line = reader.readLine()) != null) { String line = null;
builder.append(line).append("\n"); while ((line = reader.readLine()) != null) {
builder.append(line).append("\n");
}
builder.append("</data>");
String data = builder.toString();
runner.enqueue(data.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0);
final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1);
transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml"));
} }
builder.append("</data>");
String data = builder.toString();
runner.enqueue(data.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0);
final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1);
transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml"));
} }
} }

View File

@ -47,5 +47,9 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -298,27 +299,18 @@ public class DistributedMapCacheClientService extends AbstractControllerService
if (closed) { if (closed) {
throw new IllegalStateException("Client is closed"); throw new IllegalStateException("Client is closed");
} }
boolean tryToRequeue = true;
final CommsSession session = leaseCommsSession(); final CommsSession session = leaseCommsSession();
try { try {
return action.execute(session); return action.execute(session);
} catch (final IOException ioe) { } catch (final IOException ioe) {
try { tryToRequeue = false;
session.close();
} catch (final IOException ignored) {
}
throw ioe; throw ioe;
} finally { } finally {
if (!session.isClosed()) { if (tryToRequeue == true && this.closed == false) {
if (this.closed) { queue.offer(session);
try { } else {
session.close(); IOUtils.closeQuietly(session);
} catch (final IOException ioe) {
}
} else {
queue.offer(session);
}
} }
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -127,10 +128,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
try { try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
} catch (final HandshakeException e) { } catch (final HandshakeException e) {
try { IOUtils.closeQuietly(session);
session.close();
} catch (final IOException ioe) {
}
throw new IOException(e); throw new IOException(e);
} }
@ -162,9 +160,9 @@ public class DistributedSetCacheClientService extends AbstractControllerService
try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
dos.writeUTF("close"); dos.writeUTF("close");
dos.flush(); dos.flush();
commsSession.close();
} catch (final IOException e) { } catch (final IOException e) {
} }
IOUtils.closeQuietly(commsSession);
} }
if (logger.isDebugEnabled() && getIdentifier() != null) { if (logger.isDebugEnabled() && getIdentifier() != null) {
logger.debug("Closed {}", new Object[]{getIdentifier()}); logger.debug("Closed {}", new Object[]{getIdentifier()});
@ -185,6 +183,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
} }
final CommsSession session = leaseCommsSession(); final CommsSession session = leaseCommsSession();
boolean tryToRequeue = true;
try { try {
final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF(methodName); dos.writeUTF(methodName);
@ -198,22 +197,13 @@ public class DistributedSetCacheClientService extends AbstractControllerService
final DataInputStream dis = new DataInputStream(session.getInputStream()); final DataInputStream dis = new DataInputStream(session.getInputStream());
return dis.readBoolean(); return dis.readBoolean();
} catch (final IOException ioe) { } catch (final IOException ioe) {
try { tryToRequeue = false;
session.close();
} catch (final IOException ignored) {
}
throw ioe; throw ioe;
} finally { } finally {
if (!session.isClosed()) { if (tryToRequeue == true && this.closed == false) {
if (this.closed) { queue.offer(session);
try { } else {
session.close(); IOUtils.closeQuietly(session);
} catch (final IOException ioe) {
}
} else {
queue.offer(session);
}
} }
} }
} }