From fea59e3249324caa37d14c3a3366321e3f2f8242 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sun, 3 May 2015 16:42:54 -0400 Subject: [PATCH 1/3] NIFI-572: Do not set UUID as being modified when creating a clone --- .../repository/StandardProcessSession.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index d3b0690bfc..2c032d357f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private int removedCount = 0; // number of flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session - private LongHolder bytesRead = new LongHolder(0L); - private LongHolder bytesWritten = new LongHolder(0L); + private final LongHolder bytesRead = new LongHolder(0L); + private final LongHolder bytesWritten = new LongHolder(0L); private int flowFilesIn = 0, flowFilesOut = 0; private long contentSizeIn = 0L, contentSizeOut = 0L; private int writeRecursionLevel = 0; @@ -139,11 +139,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // maps a FlowFile to all Provenance Events that were generated for that FlowFile. // we do this so that if we generate a Fork event, for example, and then remove the event in the same // Session, we will not send that event to the Provenance Repository - private Map> generatedProvenanceEvents = new HashMap<>(); + private final Map> generatedProvenanceEvents = new HashMap<>(); // when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent // so that we are able to aggregate many into a single Fork Event. - private Map forkEventBuilders = new HashMap<>(); + private final Map forkEventBuilders = new HashMap<>(); private Checkpoint checkpoint = new Checkpoint(); @@ -266,7 +266,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (claim != null) { context.getContentRepository().incrementClaimaintCount(claim); } - newRecord.setWorking(clone, CoreAttributes.UUID.key(), newUuid); + newRecord.setWorking(clone, Collections.emptyMap()); newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setTransferRelationship(record.getTransferRelationship()); @@ -1282,7 +1282,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE context.getContentRepository().incrementClaimaintCount(claim); } final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(clone, CoreAttributes.UUID.key(), newUuid); + record.setWorking(clone, Collections.emptyMap()); records.put(clone, record); if (offset == 0L && size == example.getSize()) { @@ -1874,7 +1874,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim); - } catch (IOException e) { + } catch (final IOException e) { resetWriteClaims(); throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e); } @@ -1994,7 +1994,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // Get the current Content Claim from the record and see if we already have // an OutputStream that we can append to. - ContentClaim oldClaim = record.getCurrentClaim(); + final ContentClaim oldClaim = record.getCurrentClaim(); ByteCountingOutputStream outStream = appendableStreams.get(oldClaim); long originalByteWrittenCount = 0; From 018473b352ef5216e8581fa843b1a3a958c51d9b Mon Sep 17 00:00:00 2001 From: Mark Latimer Date: Sat, 2 May 2015 12:18:51 +0100 Subject: [PATCH 2/3] NIFI-572 possible null dereference in PersistentProvenanceRepository.submitQuery Signed-off-by: Mark Payne --- .../apache/nifi/provenance/PersistentProvenanceRepository.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 27f2cbbbc7..5da5d6fe42 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -1438,9 +1438,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository trimmed = latestList; } - final Long maxEventId = getMaxEventId(); + Long maxEventId = getMaxEventId(); if (maxEventId == null) { result.getResult().update(Collections.emptyList(), 0L); + maxEventId = 0L; } Long minIndexedId = indexConfig.getMinIdIndexed(); if (minIndexedId == null) { From 22ded807be8a81f09b618af4c79a9b2d8ee7496f Mon Sep 17 00:00:00 2001 From: Mark Latimer Date: Sat, 2 May 2015 12:47:29 +0100 Subject: [PATCH 3/3] NIFI-573 fix NPE in nifi.sh and clean up open streams Signed-off-by: Mark Payne --- .../src/main/java/org/apache/nifi/bootstrap/RunNiFi.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index d3796b5c9f..bb83e3d4b3 100644 --- a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -276,6 +276,8 @@ public class RunNiFi { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); logger.log(Level.FINE, "PING response: {0}", response); + out.close(); + reader.close(); return PING_CMD.equals(response); } catch (final IOException ioe) { @@ -425,6 +427,7 @@ public class RunNiFi { final Integer port = getCurrentPort(); if (port == null) { System.out.println("Apache NiFi is not currently running"); + return; } final Properties nifiProps = loadProperties(); @@ -442,6 +445,7 @@ public class RunNiFi { final OutputStream out = socket.getOutputStream(); out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); + out.close(); final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -449,6 +453,7 @@ public class RunNiFi { while ((line = reader.readLine()) != null) { sb.append(line).append("\n"); } + reader.close(); } final String dump = sb.toString(); @@ -483,10 +488,12 @@ public class RunNiFi { final OutputStream out = socket.getOutputStream(); out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); + out.close(); final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); + reader.close(); logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response);