mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
684ed8e1f5
|
@ -276,6 +276,8 @@ public class RunNiFi {
|
|||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||
final String response = reader.readLine();
|
||||
logger.log(Level.FINE, "PING response: {0}", response);
|
||||
out.close();
|
||||
reader.close();
|
||||
|
||||
return PING_CMD.equals(response);
|
||||
} catch (final IOException ioe) {
|
||||
|
@ -425,6 +427,7 @@ public class RunNiFi {
|
|||
final Integer port = getCurrentPort();
|
||||
if (port == null) {
|
||||
System.out.println("Apache NiFi is not currently running");
|
||||
return;
|
||||
}
|
||||
|
||||
final Properties nifiProps = loadProperties();
|
||||
|
@ -442,6 +445,7 @@ public class RunNiFi {
|
|||
final OutputStream out = socket.getOutputStream();
|
||||
out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
final InputStream in = socket.getInputStream();
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||
|
@ -449,6 +453,7 @@ public class RunNiFi {
|
|||
while ((line = reader.readLine()) != null) {
|
||||
sb.append(line).append("\n");
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
|
||||
final String dump = sb.toString();
|
||||
|
@ -483,10 +488,12 @@ public class RunNiFi {
|
|||
final OutputStream out = socket.getOutputStream();
|
||||
out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
final InputStream in = socket.getInputStream();
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||
final String response = reader.readLine();
|
||||
reader.close();
|
||||
|
||||
logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response);
|
||||
|
||||
|
|
|
@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
|
||||
private int removedCount = 0; // number of flowfiles removed in this session
|
||||
private long removedBytes = 0L; // size of all flowfiles removed in this session
|
||||
private LongHolder bytesRead = new LongHolder(0L);
|
||||
private LongHolder bytesWritten = new LongHolder(0L);
|
||||
private final LongHolder bytesRead = new LongHolder(0L);
|
||||
private final LongHolder bytesWritten = new LongHolder(0L);
|
||||
private int flowFilesIn = 0, flowFilesOut = 0;
|
||||
private long contentSizeIn = 0L, contentSizeOut = 0L;
|
||||
private int writeRecursionLevel = 0;
|
||||
|
@ -139,11 +139,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
// maps a FlowFile to all Provenance Events that were generated for that FlowFile.
|
||||
// we do this so that if we generate a Fork event, for example, and then remove the event in the same
|
||||
// Session, we will not send that event to the Provenance Repository
|
||||
private Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>();
|
||||
private final Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>();
|
||||
|
||||
// when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent
|
||||
// so that we are able to aggregate many into a single Fork Event.
|
||||
private Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
|
||||
private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
|
||||
|
||||
private Checkpoint checkpoint = new Checkpoint();
|
||||
|
||||
|
@ -266,7 +266,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
if (claim != null) {
|
||||
context.getContentRepository().incrementClaimaintCount(claim);
|
||||
}
|
||||
newRecord.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
|
||||
newRecord.setWorking(clone, Collections.<String, String>emptyMap());
|
||||
|
||||
newRecord.setDestination(destination.getFlowFileQueue());
|
||||
newRecord.setTransferRelationship(record.getTransferRelationship());
|
||||
|
@ -1282,7 +1282,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
context.getContentRepository().incrementClaimaintCount(claim);
|
||||
}
|
||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||
record.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
|
||||
record.setWorking(clone, Collections.<String, String>emptyMap());
|
||||
records.put(clone, record);
|
||||
|
||||
if (offset == 0L && size == example.getSize()) {
|
||||
|
@ -1874,7 +1874,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
|
||||
try {
|
||||
currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
resetWriteClaims();
|
||||
throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
|
||||
}
|
||||
|
@ -1994,7 +1994,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
|
||||
// Get the current Content Claim from the record and see if we already have
|
||||
// an OutputStream that we can append to.
|
||||
ContentClaim oldClaim = record.getCurrentClaim();
|
||||
final ContentClaim oldClaim = record.getCurrentClaim();
|
||||
ByteCountingOutputStream outStream = appendableStreams.get(oldClaim);
|
||||
long originalByteWrittenCount = 0;
|
||||
|
||||
|
|
|
@ -1438,9 +1438,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
|
|||
trimmed = latestList;
|
||||
}
|
||||
|
||||
final Long maxEventId = getMaxEventId();
|
||||
Long maxEventId = getMaxEventId();
|
||||
if (maxEventId == null) {
|
||||
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
|
||||
maxEventId = 0L;
|
||||
}
|
||||
Long minIndexedId = indexConfig.getMinIdIndexed();
|
||||
if (minIndexedId == null) {
|
||||
|
|
Loading…
Reference in New Issue