NIFI-1222: Session.adjustCounter keeps track of local and global counters; it then call processContext.adjustCounter with each of them, but ProcessContext was changed a while back to automatically increment both 'lcoal' and 'global' counters each time, so our numbers are doubled; removed the 'localCounters' and 'globalCounters' from StandardProcessSession and replaced with just 'counters'

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2015-11-25 16:11:11 -05:00 committed by joewitt
parent a29b7b3bf0
commit 5061e5fa0a
1 changed files with 6 additions and 15 deletions

View File

@ -103,8 +103,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
private final Map<String, Long> localCounters = new HashMap<>(); private final Map<String, Long> counters = new HashMap<>();
private final Map<String, Long> globalCounters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context; private final ProcessContext context;
private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
@ -397,11 +396,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
} }
for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) { for (final Map.Entry<String, Long> entry : checkpoint.counters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true); adjustCounter(entry.getKey(), entry.getValue(), true);
} }
@ -993,8 +988,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
connectionCounts.clear(); connectionCounts.clear();
createdFlowFiles.clear(); createdFlowFiles.clear();
removedFlowFiles.clear(); removedFlowFiles.clear();
globalCounters.clear(); counters.clear();
localCounters.clear();
generatedProvenanceEvents.clear(); generatedProvenanceEvents.clear();
forkEventBuilders.clear(); forkEventBuilders.clear();
@ -1167,8 +1161,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return; return;
} }
adjustCounter(name, delta, localCounters); adjustCounter(name, delta, counters);
adjustCounter(name, delta, globalCounters);
} }
private void adjustCounter(final String name, final long delta, final Map<String, Long> map) { private void adjustCounter(final String name, final long delta, final Map<String, Long> map) {
@ -2554,8 +2547,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
private final Map<String, Long> localCounters = new HashMap<>(); private final Map<String, Long> counters = new HashMap<>();
private final Map<String, Long> globalCounters = new HashMap<>();
private final Set<Path> deleteOnCommit = new HashSet<>(); private final Set<Path> deleteOnCommit = new HashSet<>();
private final Set<String> removedFlowFiles = new HashSet<>(); private final Set<String> removedFlowFiles = new HashSet<>();
@ -2581,8 +2573,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.records.putAll(session.records); this.records.putAll(session.records);
this.connectionCounts.putAll(session.connectionCounts); this.connectionCounts.putAll(session.connectionCounts);
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles); this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
this.localCounters.putAll(session.localCounters); this.counters.putAll(session.counters);
this.globalCounters.putAll(session.globalCounters);
this.deleteOnCommit.addAll(session.deleteOnCommit); this.deleteOnCommit.addAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles); this.removedFlowFiles.addAll(session.removedFlowFiles);