mirror of https://github.com/apache/nifi.git
NIFI-9847: Switched LifecycleState to use a WeakHashMap to track Acti… (#5917)
* NIFI-9847: Switched LifecycleState to use a WeakHashMap to track ActiveProcessSessionFactory instances, instead of a regular Set that removed the instance after calling onTrigger. This was necessary for processors such as MergeRecord that may stash away an ActiveProcessSessionFactory for later use, as we need to be able to force rollback on processor termination * NIFI-9847: Fixed checkstyle violation
This commit is contained in:
parent
0f8183dd95
commit
940fd8e81c
|
@ -1818,7 +1818,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
deactivateThread();
|
deactivateThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleState.decrementActiveThreadCount(null);
|
scheduleState.decrementActiveThreadCount();
|
||||||
hasActiveThreads = false;
|
hasActiveThreads = false;
|
||||||
scheduledState.set(ScheduledState.STOPPED);
|
scheduledState.set(ScheduledState.STOPPED);
|
||||||
future.complete(null);
|
future.complete(null);
|
||||||
|
|
|
@ -1329,11 +1329,11 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
if (repoRecord.getOriginalQueue() != null && repoRecord.getOriginalQueue().getIdentifier() != null) {
|
if (repoRecord.getOriginalQueue() != null && repoRecord.getOriginalQueue().getIdentifier() != null) {
|
||||||
details.append("queue=")
|
details.append("queue=")
|
||||||
.append(repoRecord.getOriginalQueue().getIdentifier())
|
.append(repoRecord.getOriginalQueue().getIdentifier())
|
||||||
.append("/");
|
.append(", ");
|
||||||
}
|
}
|
||||||
details.append("filename=")
|
details.append("filename=")
|
||||||
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key()))
|
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key()))
|
||||||
.append("/uuid=")
|
.append(", uuid=")
|
||||||
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
|
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
|
||||||
}
|
}
|
||||||
if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
|
if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
|
||||||
|
@ -1341,7 +1341,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
details.append(", ");
|
details.append(", ");
|
||||||
}
|
}
|
||||||
details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
|
details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
|
||||||
.append(" additional Flowfiles not listed");
|
.append(" additional FlowFiles not listed");
|
||||||
} else if (filesListed == 0) {
|
} else if (filesListed == 0) {
|
||||||
details.append("none");
|
details.append("none");
|
||||||
}
|
}
|
||||||
|
@ -1440,8 +1440,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
|
public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
|
||||||
verifyTaskActive();
|
|
||||||
|
|
||||||
if (Objects.requireNonNull(newOwner) == this) {
|
if (Objects.requireNonNull(newOwner) == this) {
|
||||||
throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
|
throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
|
||||||
}
|
}
|
||||||
|
@ -1457,188 +1455,200 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
migrate((StandardProcessSession) newOwner, flowFiles);
|
migrate((StandardProcessSession) newOwner, flowFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void migrate(final StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {
|
private synchronized void migrate(final StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {
|
||||||
// We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc.
|
// This method will update many member variables/internal state of both `this` and `newOwner`. These member variables may also be updated during
|
||||||
flowFiles = flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
|
// session rollback, such as when a Processor is terminated. As such, we need to ensure that we synchronize on both `this` and `newOwner` so that
|
||||||
|
// neither can be rolled back while we are in the process of migrating FlowFiles from one session to another.
|
||||||
|
//
|
||||||
|
// We must also ensure that we verify that both sessions are in an amenable state to perform this transference after obtaining the synchronization lock.
|
||||||
|
// We synchronize on 'this' by marking the method synchronized. Because the only way in which one Process Session will call into another is via this migrate() method,
|
||||||
|
// we do not need to worry about the order in which the synchronized lock is obtained.
|
||||||
|
synchronized (newOwner) {
|
||||||
|
verifyTaskActive();
|
||||||
|
newOwner.verifyTaskActive();
|
||||||
|
|
||||||
for (final FlowFile flowFile : flowFiles) {
|
// We don't call validateRecordState() here because we want to allow migration of FlowFiles that have already been marked as removed or transferred, etc.
|
||||||
if (openInputStreams.containsKey(flowFile)) {
|
flowFiles = flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
|
||||||
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
|
|
||||||
+ "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (openOutputStreams.containsKey(flowFile)) {
|
|
||||||
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
|
|
||||||
+ "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (readRecursionSet.containsKey(flowFile)) {
|
|
||||||
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
|
|
||||||
}
|
|
||||||
if (writeRecursionSet.contains(flowFile)) {
|
|
||||||
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
|
|
||||||
}
|
|
||||||
|
|
||||||
final StandardRepositoryRecord record = getRecord(flowFile);
|
|
||||||
if (record == null) {
|
|
||||||
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have a FORK event for one of the given FlowFiles, then all children must also be migrated. Otherwise, we
|
|
||||||
// could have a case where we have FlowFile A transferred and eventually exiting the flow and later the 'newOwner'
|
|
||||||
// ProcessSession is committed, claiming to have created FlowFiles from the parent, which is no longer even in
|
|
||||||
// the flow. This would be very confusing when looking at the provenance for the FlowFile, so it is best to avoid this.
|
|
||||||
final Set<String> flowFileIds = flowFiles.stream()
|
|
||||||
.map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
|
|
||||||
for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
|
|
||||||
final FlowFile eventFlowFile = entry.getKey();
|
|
||||||
if (flowFiles.contains(eventFlowFile)) {
|
|
||||||
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
|
||||||
for (final String childId : eventBuilder.getChildFlowFileIds()) {
|
|
||||||
if (!flowFileIds.contains(childId)) {
|
|
||||||
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size()
|
|
||||||
+ " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
|
||||||
for (final String childId : eventBuilder.getChildFlowFileIds()) {
|
|
||||||
if (flowFileIds.contains(childId)) {
|
|
||||||
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked from a Parent FlowFile, but the parent is not being migrated. "
|
|
||||||
+ "If any FlowFile is forked, the parent and all children must be migrated at the same time.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have a FORK event where a FlowFile is a child of the FORK event, we want to create a FORK
|
|
||||||
// event builder for the new owner of the FlowFile and remove the child from our fork event builder.
|
|
||||||
final Set<FlowFile> forkedFlowFilesMigrated = new HashSet<>();
|
|
||||||
for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
|
|
||||||
final FlowFile eventFlowFile = entry.getKey();
|
|
||||||
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
|
||||||
|
|
||||||
// If the FlowFile that the event is attached to is not being migrated, we should not migrate the fork event builder either.
|
|
||||||
if (!flowFiles.contains(eventFlowFile)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<String> childrenIds = new HashSet<>(eventBuilder.getChildFlowFileIds());
|
|
||||||
|
|
||||||
ProvenanceEventBuilder copy = null;
|
|
||||||
for (final FlowFile flowFile : flowFiles) {
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
|
if (openInputStreams.containsKey(flowFile)) {
|
||||||
if (childrenIds.contains(flowFileId)) {
|
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
|
||||||
eventBuilder.removeChildFlowFile(flowFile);
|
+ "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
|
||||||
|
}
|
||||||
|
|
||||||
if (copy == null) {
|
if (openOutputStreams.containsKey(flowFile)) {
|
||||||
copy = eventBuilder.copy();
|
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
|
||||||
copy.getChildFlowFileIds().clear();
|
+ "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
|
||||||
}
|
}
|
||||||
copy.addChildFlowFile(flowFileId);
|
|
||||||
|
if (readRecursionSet.containsKey(flowFile)) {
|
||||||
|
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
|
||||||
|
}
|
||||||
|
if (writeRecursionSet.contains(flowFile)) {
|
||||||
|
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
final StandardRepositoryRecord record = getRecord(flowFile);
|
||||||
|
if (record == null) {
|
||||||
|
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (copy != null) {
|
// If we have a FORK event for one of the given FlowFiles, then all children must also be migrated. Otherwise, we
|
||||||
newOwner.forkEventBuilders.put(eventFlowFile, copy);
|
// could have a case where we have FlowFile A transferred and eventually exiting the flow and later the 'newOwner'
|
||||||
forkedFlowFilesMigrated.add(eventFlowFile);
|
// ProcessSession is committed, claiming to have created FlowFiles from the parent, which is no longer even in
|
||||||
}
|
// the flow. This would be very confusing when looking at the provenance for the FlowFile, so it is best to avoid this.
|
||||||
}
|
final Set<String> flowFileIds = flowFiles.stream()
|
||||||
|
.map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
forkedFlowFilesMigrated.forEach(forkEventBuilders::remove);
|
for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
|
||||||
|
final FlowFile eventFlowFile = entry.getKey();
|
||||||
newOwner.processingStartTime = Math.min(newOwner.processingStartTime, processingStartTime);
|
if (flowFiles.contains(eventFlowFile)) {
|
||||||
|
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
||||||
for (final FlowFile flowFile : flowFiles) {
|
for (final String childId : eventBuilder.getChildFlowFileIds()) {
|
||||||
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
|
if (!flowFileIds.contains(childId)) {
|
||||||
|
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size()
|
||||||
final long flowFileId = flowFile.getId();
|
+ " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
|
||||||
final StandardRepositoryRecord repoRecord = this.records.remove(flowFileId);
|
}
|
||||||
newOwner.records.put(flowFileId, repoRecord);
|
}
|
||||||
|
} else {
|
||||||
final Collection<Long> linkedIds = this.flowFileLinkage.remove(flowFileId);
|
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
||||||
if (linkedIds != null) {
|
for (final String childId : eventBuilder.getChildFlowFileIds()) {
|
||||||
linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
|
if (flowFileIds.contains(childId)) {
|
||||||
|
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked from a Parent FlowFile, " +
|
||||||
|
"but the parent is not being migrated. If any FlowFile is forked, the parent and all children must be migrated at the same time.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
|
// If we have a FORK event where a FlowFile is a child of the FORK event, we want to create a FORK
|
||||||
// We do not have to worry about accounting for 'input counts' on connections because those
|
// event builder for the new owner of the FlowFile and remove the child from our fork event builder.
|
||||||
// are incremented only during a checkpoint, and anything that's been checkpointed has
|
final Set<FlowFile> forkedFlowFilesMigrated = new HashSet<>();
|
||||||
// also been committed above.
|
for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
|
||||||
final FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
|
final FlowFile eventFlowFile = entry.getKey();
|
||||||
if (inputQueue != null) {
|
final ProvenanceEventBuilder eventBuilder = entry.getValue();
|
||||||
final String connectionId = inputQueue.getIdentifier();
|
|
||||||
incrementConnectionOutputCounts(connectionId, -1, -repoRecord.getOriginal().getSize());
|
|
||||||
newOwner.incrementConnectionOutputCounts(connectionId, 1, repoRecord.getOriginal().getSize());
|
|
||||||
|
|
||||||
unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
|
// If the FlowFile that the event is attached to is not being migrated, we should not migrate the fork event builder either.
|
||||||
newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new HashSet<>()).add(flowFileRecord);
|
if (!flowFiles.contains(eventFlowFile)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
flowFilesIn--;
|
final Set<String> childrenIds = new HashSet<>(eventBuilder.getChildFlowFileIds());
|
||||||
contentSizeIn -= flowFile.getSize();
|
|
||||||
|
|
||||||
newOwner.flowFilesIn++;
|
ProvenanceEventBuilder copy = null;
|
||||||
newOwner.contentSizeIn += flowFile.getSize();
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
final String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
|
||||||
|
if (childrenIds.contains(flowFileId)) {
|
||||||
|
eventBuilder.removeChildFlowFile(flowFile);
|
||||||
|
|
||||||
|
if (copy == null) {
|
||||||
|
copy = eventBuilder.copy();
|
||||||
|
copy.getChildFlowFileIds().clear();
|
||||||
|
}
|
||||||
|
copy.addChildFlowFile(flowFileId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copy != null) {
|
||||||
|
newOwner.forkEventBuilders.put(eventFlowFile, copy);
|
||||||
|
forkedFlowFilesMigrated.add(eventFlowFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String flowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
|
forkedFlowFilesMigrated.forEach(forkEventBuilders::remove);
|
||||||
if (removedFlowFiles.remove(flowFileUuid)) {
|
|
||||||
newOwner.removedFlowFiles.add(flowFileUuid);
|
|
||||||
newOwner.removedCount++;
|
|
||||||
newOwner.removedBytes += flowFile.getSize();
|
|
||||||
|
|
||||||
removedCount--;
|
newOwner.processingStartTime = Math.min(newOwner.processingStartTime, processingStartTime);
|
||||||
removedBytes -= flowFile.getSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (createdFlowFiles.remove(flowFileUuid)) {
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
newOwner.createdFlowFiles.add(flowFileUuid);
|
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
|
||||||
}
|
|
||||||
|
|
||||||
if (repoRecord.getTransferRelationship() != null) {
|
final long flowFileId = flowFile.getId();
|
||||||
final Relationship transferRelationship = repoRecord.getTransferRelationship();
|
final StandardRepositoryRecord repoRecord = this.records.remove(flowFileId);
|
||||||
final Collection<Connection> destinations = context.getConnections(transferRelationship);
|
newOwner.records.put(flowFileId, repoRecord);
|
||||||
final int numDestinations = destinations.size();
|
|
||||||
final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship);
|
|
||||||
|
|
||||||
if (autoTerminated) {
|
final Collection<Long> linkedIds = this.flowFileLinkage.remove(flowFileId);
|
||||||
removedCount--;
|
if (linkedIds != null) {
|
||||||
removedBytes -= flowFile.getSize();
|
linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, linkedId));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
|
||||||
|
// We do not have to worry about accounting for 'input counts' on connections because those
|
||||||
|
// are incremented only during a checkpoint, and anything that's been checkpointed has
|
||||||
|
// also been committed above.
|
||||||
|
final FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
|
||||||
|
if (inputQueue != null) {
|
||||||
|
final String connectionId = inputQueue.getIdentifier();
|
||||||
|
incrementConnectionOutputCounts(connectionId, -1, -repoRecord.getOriginal().getSize());
|
||||||
|
newOwner.incrementConnectionOutputCounts(connectionId, 1, repoRecord.getOriginal().getSize());
|
||||||
|
|
||||||
|
unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
|
||||||
|
newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new HashSet<>()).add(flowFileRecord);
|
||||||
|
|
||||||
|
flowFilesIn--;
|
||||||
|
contentSizeIn -= flowFile.getSize();
|
||||||
|
|
||||||
|
newOwner.flowFilesIn++;
|
||||||
|
newOwner.contentSizeIn += flowFile.getSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String flowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
|
||||||
|
if (removedFlowFiles.remove(flowFileUuid)) {
|
||||||
|
newOwner.removedFlowFiles.add(flowFileUuid);
|
||||||
newOwner.removedCount++;
|
newOwner.removedCount++;
|
||||||
newOwner.removedBytes += flowFile.getSize();
|
newOwner.removedBytes += flowFile.getSize();
|
||||||
} else {
|
|
||||||
flowFilesOut--;
|
|
||||||
contentSizeOut -= flowFile.getSize();
|
|
||||||
|
|
||||||
newOwner.flowFilesOut++;
|
removedCount--;
|
||||||
newOwner.contentSizeOut += flowFile.getSize();
|
removedBytes -= flowFile.getSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (createdFlowFiles.remove(flowFileUuid)) {
|
||||||
|
newOwner.createdFlowFiles.add(flowFileUuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (repoRecord.getTransferRelationship() != null) {
|
||||||
|
final Relationship transferRelationship = repoRecord.getTransferRelationship();
|
||||||
|
final Collection<Connection> destinations = context.getConnections(transferRelationship);
|
||||||
|
final int numDestinations = destinations.size();
|
||||||
|
final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship);
|
||||||
|
|
||||||
|
if (autoTerminated) {
|
||||||
|
removedCount--;
|
||||||
|
removedBytes -= flowFile.getSize();
|
||||||
|
|
||||||
|
newOwner.removedCount++;
|
||||||
|
newOwner.removedBytes += flowFile.getSize();
|
||||||
|
} else {
|
||||||
|
flowFilesOut--;
|
||||||
|
contentSizeOut -= flowFile.getSize();
|
||||||
|
|
||||||
|
newOwner.flowFilesOut++;
|
||||||
|
newOwner.contentSizeOut += flowFile.getSize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
|
||||||
|
if (events != null) {
|
||||||
|
newOwner.generatedProvenanceEvents.put(flowFile, events);
|
||||||
|
}
|
||||||
|
|
||||||
|
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
|
||||||
|
if (currentClaim != null) {
|
||||||
|
final ByteCountingOutputStream appendableStream = appendableStreams.remove(currentClaim);
|
||||||
|
if (appendableStream != null) {
|
||||||
|
newOwner.appendableStreams.put(currentClaim, appendableStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Path toDelete = deleteOnCommit.remove(flowFile);
|
||||||
|
if (toDelete != null) {
|
||||||
|
newOwner.deleteOnCommit.put(flowFile, toDelete);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
|
provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
|
||||||
if (events != null) {
|
|
||||||
newOwner.generatedProvenanceEvents.put(flowFile, events);
|
|
||||||
}
|
|
||||||
|
|
||||||
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
|
|
||||||
if (currentClaim != null) {
|
|
||||||
final ByteCountingOutputStream appendableStream = appendableStreams.remove(currentClaim);
|
|
||||||
if (appendableStream != null) {
|
|
||||||
newOwner.appendableStreams.put(currentClaim, appendableStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final Path toDelete = deleteOnCommit.remove(flowFile);
|
|
||||||
if (toDelete != null) {
|
|
||||||
newOwner.deleteOnCommit.put(flowFile, toDelete);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1793,11 +1803,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
||||||
flowFilesIn++;
|
flowFilesIn++;
|
||||||
contentSizeIn += flowFile.getSize();
|
contentSizeIn += flowFile.getSize();
|
||||||
|
|
||||||
Set<FlowFileRecord> set = unacknowledgedFlowFiles.get(connection.getFlowFileQueue());
|
final Set<FlowFileRecord> set = unacknowledgedFlowFiles.computeIfAbsent(connection.getFlowFileQueue(), k -> new HashSet<>());
|
||||||
if (set == null) {
|
|
||||||
set = new HashSet<>();
|
|
||||||
unacknowledgedFlowFiles.put(connection.getFlowFileQueue(), set);
|
|
||||||
}
|
|
||||||
set.add(flowFile);
|
set.add(flowFile);
|
||||||
|
|
||||||
incrementConnectionOutputCounts(connection, flowFile);
|
incrementConnectionOutputCounts(connection, flowFile);
|
||||||
|
|
|
@ -23,7 +23,9 @@ import org.apache.nifi.processor.exception.TerminatedTaskException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.WeakHashMap;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -36,7 +38,7 @@ public class LifecycleState {
|
||||||
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
|
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
|
||||||
private volatile long lastStopTime = -1;
|
private volatile long lastStopTime = -1;
|
||||||
private volatile boolean terminated = false;
|
private volatile boolean terminated = false;
|
||||||
private final Set<ActiveProcessSessionFactory> activeProcessSessionFactories = Collections.synchronizedSet(new HashSet<>());
|
private final Map<ActiveProcessSessionFactory, Object> activeProcessSessionFactories = new WeakHashMap<>();
|
||||||
|
|
||||||
public synchronized int incrementActiveThreadCount(final ActiveProcessSessionFactory sessionFactory) {
|
public synchronized int incrementActiveThreadCount(final ActiveProcessSessionFactory sessionFactory) {
|
||||||
if (terminated) {
|
if (terminated) {
|
||||||
|
@ -44,21 +46,29 @@ public class LifecycleState {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sessionFactory != null) {
|
if (sessionFactory != null) {
|
||||||
activeProcessSessionFactories.add(sessionFactory);
|
// If a session factory is provided, add it to our WeakHashMap. The value that we use is not relevant,
|
||||||
|
// as this just serves, essentially, as a WeakHashSet, but there is no WeakHashSet implementation.
|
||||||
|
// We need to keep track of any ActiveProcessSessionFactory that has been created for this component,
|
||||||
|
// as long as the session factory has not been garbage collected. This is important because when we offload
|
||||||
|
// a node, we will terminate all active processors and we need the ability to terminate any active sessions
|
||||||
|
// at that time. We cannot simply store a Set of all ActiveProcessSessionFactories and then remove them in the
|
||||||
|
// decrementActiveThreadCount because a Processor may choose to continue using the ProcessSessionFactory even after
|
||||||
|
// returning from its onTrigger method.
|
||||||
|
//
|
||||||
|
// For example, it may stash the ProcessSessionFactory away in a member variable in order to aggregate FlowFiles across
|
||||||
|
// many onTrigger invocations. In this case, we need the ability to force the rollback of any created session upon Processor
|
||||||
|
// termination.
|
||||||
|
activeProcessSessionFactories.put(sessionFactory, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
return activeThreadCount.incrementAndGet();
|
return activeThreadCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int decrementActiveThreadCount(final ActiveProcessSessionFactory sessionFactory) {
|
public synchronized int decrementActiveThreadCount() {
|
||||||
if (terminated) {
|
if (terminated) {
|
||||||
return activeThreadCount.get();
|
return activeThreadCount.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sessionFactory != null) {
|
|
||||||
activeProcessSessionFactories.remove(sessionFactory);
|
|
||||||
}
|
|
||||||
|
|
||||||
return activeThreadCount.decrementAndGet();
|
return activeThreadCount.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,8 +95,7 @@ public class LifecycleState {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new StringBuilder().append("activeThreads:").append(activeThreadCount.get()).append("; ")
|
return "LifecycleState[activeThreads= " + activeThreadCount.get() + ", scheduled=" + scheduled.get() + "]";
|
||||||
.append("scheduled:").append(scheduled.get()).append("; ").toString();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,7 +132,8 @@ public class LifecycleState {
|
||||||
this.terminated = true;
|
this.terminated = true;
|
||||||
activeThreadCount.set(0);
|
activeThreadCount.set(0);
|
||||||
|
|
||||||
for (final ActiveProcessSessionFactory factory : activeProcessSessionFactories) {
|
// Terminate any active sessions.
|
||||||
|
for (final ActiveProcessSessionFactory factory : activeProcessSessionFactories.keySet()) {
|
||||||
factory.terminateActiveSessions();
|
factory.terminateActiveSessions();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,4 +43,9 @@ public class FlowFileQueueContents {
|
||||||
public QueueSize getSwapSize() {
|
public QueueSize getSwapSize() {
|
||||||
return swapSize;
|
return swapSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "FlowFileQueueContents[swapLocations=" + swapLocations + ", swapSize=" + swapSize + ", activeFlowFiles=" + activeFlowFiles + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -501,7 +501,12 @@ public class SwappablePriorityQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
|
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
|
||||||
logger.trace("{} Acknowledging {}", this, flowFiles);
|
if (logger.isTraceEnabled()) {
|
||||||
|
for (final FlowFileRecord flowFile : flowFiles) {
|
||||||
|
logger.trace("{} Acknowledging {}", this, flowFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
|
final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
|
||||||
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
|
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
|
||||||
}
|
}
|
||||||
|
@ -627,8 +632,10 @@ public class SwappablePriorityQueue {
|
||||||
writeLock.unlock("poll(int, Set)");
|
writeLock.unlock("poll(int, Set)");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!records.isEmpty()) {
|
if (!records.isEmpty() && logger.isTraceEnabled()) {
|
||||||
logger.trace("{} poll() returning {}", this, records);
|
for (final FlowFileRecord flowFile : records) {
|
||||||
|
logger.trace("{} poll() returning {}", this, flowFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return records;
|
return records;
|
||||||
|
@ -690,8 +697,10 @@ public class SwappablePriorityQueue {
|
||||||
this.activeQueue.addAll(unselected);
|
this.activeQueue.addAll(unselected);
|
||||||
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
|
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
|
||||||
|
|
||||||
if (!selectedFlowFiles.isEmpty()) {
|
if (!selectedFlowFiles.isEmpty() && logger.isTraceEnabled()) {
|
||||||
logger.trace("{} poll() returning {}", this, selectedFlowFiles);
|
for (final FlowFileRecord flowFile : selectedFlowFiles) {
|
||||||
|
logger.trace("{} poll() returning {}", this, flowFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return selectedFlowFiles;
|
return selectedFlowFiles;
|
||||||
|
|
|
@ -244,7 +244,10 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Setting queue {} on node {} as offloaded", this, clusterCoordinator.getLocalNodeIdentifier());
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Setting queue {} on node {} as offloaded. Current size: {}, Partition Sizes: {}", this, clusterCoordinator.getLocalNodeIdentifier(), size(), getPartitionSizes());
|
||||||
|
}
|
||||||
|
|
||||||
offloaded = true;
|
offloaded = true;
|
||||||
|
|
||||||
partitionWriteLock.lock();
|
partitionWriteLock.lock();
|
||||||
|
@ -271,11 +274,30 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
|
|
||||||
// Update our partitioner so that we don't keep any data on the local partition
|
// Update our partitioner so that we don't keep any data on the local partition
|
||||||
setFlowFilePartitioner(new NonLocalPartitionPartitioner());
|
setFlowFilePartitioner(new NonLocalPartitionPartitioner());
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Queue {} has now updated Partition on node {} for offload. Current size: {}, Partition Sizes: {}",
|
||||||
|
this, clusterCoordinator.getLocalNodeIdentifier(), size(), getPartitionSizes());
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
partitionWriteLock.unlock();
|
partitionWriteLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<QueuePartition, QueueSize> getPartitionSizes() {
|
||||||
|
partitionReadLock.lock();
|
||||||
|
try {
|
||||||
|
final Map<QueuePartition, QueueSize> sizeMap = new HashMap<>();
|
||||||
|
for (final QueuePartition partition : queuePartitions) {
|
||||||
|
sizeMap.put(partition, partition.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
return sizeMap;
|
||||||
|
} finally {
|
||||||
|
partitionReadLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetOffloadedQueue() {
|
public void resetOffloadedQueue() {
|
||||||
if (clusterCoordinator == null) {
|
if (clusterCoordinator == null) {
|
||||||
|
@ -899,7 +921,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||||
final List<FlowFileRecord> flowFileList = (flowFiles instanceof List) ? (List<FlowFileRecord>) flowFiles : new ArrayList<>(flowFiles);
|
final List<FlowFileRecord> flowFileList = (flowFiles instanceof List) ? (List<FlowFileRecord>) flowFiles : new ArrayList<>(flowFiles);
|
||||||
partitionMap = Collections.singletonMap(partition, flowFileList);
|
partitionMap = Collections.singletonMap(partition, flowFileList);
|
||||||
|
|
||||||
logger.debug("Partitioner is static so Partitioned FlowFiles as: {}", partitionMap);
|
logger.debug("Partitioner {} is static so Partitioned FlowFiles as: {}", partitioner, partitionMap);
|
||||||
return partitionMap;
|
return partitionMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager;
|
||||||
import org.apache.nifi.controller.repository.SwapSummary;
|
import org.apache.nifi.controller.repository.SwapSummary;
|
||||||
import org.apache.nifi.events.EventReporter;
|
import org.apache.nifi.events.EventReporter;
|
||||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -39,7 +41,9 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class StandardRebalancingPartition implements RebalancingPartition {
|
public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
private final String SWAP_PARTITION_NAME = "rebalance";
|
private static final Logger logger = LoggerFactory.getLogger(StandardRebalancingPartition.class);
|
||||||
|
private static final String SWAP_PARTITION_NAME = "rebalance";
|
||||||
|
|
||||||
private final String queueIdentifier;
|
private final String queueIdentifier;
|
||||||
private final BlockingSwappablePriorityQueue queue;
|
private final BlockingSwappablePriorityQueue queue;
|
||||||
private final LoadBalancedFlowFileQueue flowFileQueue;
|
private final LoadBalancedFlowFileQueue flowFileQueue;
|
||||||
|
@ -127,11 +131,13 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
|
|
||||||
private synchronized void rebalanceFromQueue() {
|
private synchronized void rebalanceFromQueue() {
|
||||||
if (stopped) {
|
if (stopped) {
|
||||||
|
logger.debug("Will not rebalance from queue because {} is stopped", this);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If a task is already defined, do nothing. There's already a thread running.
|
// If a task is already defined, do nothing. There's already a thread running.
|
||||||
if (rebalanceTask != null) {
|
if (rebalanceTask != null) {
|
||||||
|
logger.debug("Rebalance Task already exists for {}", this);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,6 +146,7 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
final Thread rebalanceThread = new Thread(this.rebalanceTask);
|
final Thread rebalanceThread = new Thread(this.rebalanceTask);
|
||||||
rebalanceThread.setName("Rebalance queued data for Connection " + queueIdentifier);
|
rebalanceThread.setName("Rebalance queued data for Connection " + queueIdentifier);
|
||||||
rebalanceThread.start();
|
rebalanceThread.start();
|
||||||
|
logger.debug("No Rebalance Task currently exists for {}. Starting new Rebalance Thread {}", this, rebalanceThread);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -148,12 +155,16 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug("Adding {} to Rebalance queue for {}", queueContents, this);
|
||||||
|
|
||||||
queue.inheritQueueContents(queueContents);
|
queue.inheritQueueContents(queueContents);
|
||||||
rebalanceFromQueue();
|
rebalanceFromQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void rebalance(final Collection<FlowFileRecord> flowFiles) {
|
public void rebalance(final Collection<FlowFileRecord> flowFiles) {
|
||||||
|
logger.debug("Adding {} to Rebalance queue for {}", flowFiles, this);
|
||||||
|
|
||||||
queue.putAll(flowFiles);
|
queue.putAll(flowFiles);
|
||||||
rebalanceFromQueue();
|
rebalanceFromQueue();
|
||||||
}
|
}
|
||||||
|
@ -163,7 +174,7 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
return queue.packageForRebalance(newPartitionName);
|
return queue.packageForRebalance(newPartitionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized boolean complete() {
|
private synchronized boolean isComplete() {
|
||||||
if (!queue.isEmpty()) {
|
if (!queue.isEmpty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -201,7 +212,8 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
if (polled == null) {
|
if (polled == null) {
|
||||||
flowFileQueue.handleExpiredRecords(expiredRecords);
|
flowFileQueue.handleExpiredRecords(expiredRecords);
|
||||||
|
|
||||||
if (complete()) {
|
if (isComplete()) {
|
||||||
|
logger.debug("Rebalance Task completed for {}", this);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
|
@ -217,6 +229,8 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||||
|
|
||||||
flowFileQueue.handleExpiredRecords(expiredRecords);
|
flowFileQueue.handleExpiredRecords(expiredRecords);
|
||||||
|
|
||||||
|
logger.debug("{} Rebalancing {}", this, toDistribute);
|
||||||
|
|
||||||
// Transfer all of the FlowFiles that we got back to the FlowFileQueue itself. This will cause the data to be
|
// Transfer all of the FlowFiles that we got back to the FlowFileQueue itself. This will cause the data to be
|
||||||
// re-partitioned and binned appropriately. We also then need to ensure that we acknowledge the data from our
|
// re-partitioned and binned appropriately. We also then need to ensure that we acknowledge the data from our
|
||||||
// own SwappablePriorityQueue to ensure that the sizes are kept in check.
|
// own SwappablePriorityQueue to ensure that the sizes are kept in check.
|
||||||
|
|
|
@ -316,7 +316,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
|
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
|
||||||
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
|
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
|
||||||
// result in using more than the maximum number of defined threads
|
// result in using more than the maximum number of defined threads
|
||||||
scheduleState.decrementActiveThreadCount(sessionFactory);
|
scheduleState.decrementActiveThreadCount();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +344,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleState.decrementActiveThreadCount(sessionFactory);
|
scheduleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
|
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
|
||||||
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
|
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
|
||||||
// result in using more than the maximum number of defined threads
|
// result in using more than the maximum number of defined threads
|
||||||
scheduleState.decrementActiveThreadCount(sessionFactory);
|
scheduleState.decrementActiveThreadCount();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,7 +386,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleState.decrementActiveThreadCount(sessionFactory);
|
scheduleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -343,7 +343,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTaskComplete() {
|
public void onTaskComplete() {
|
||||||
lifecycleState.decrementActiveThreadCount(null);
|
lifecycleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTaskComplete() {
|
public void onTaskComplete() {
|
||||||
lifecycleState.decrementActiveThreadCount(null);
|
lifecycleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -283,7 +283,7 @@ public class ConnectableTask {
|
||||||
logger.error("", e);
|
logger.error("", e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
scheduleState.decrementActiveThreadCount(activeSessionFactory);
|
scheduleState.decrementActiveThreadCount();
|
||||||
Thread.currentThread().setName(originalThreadName);
|
Thread.currentThread().setName(originalThreadName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class ReportingTaskWrapper implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lifecycleState.decrementActiveThreadCount(null);
|
lifecycleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class StatelessSchedulingAgent implements SchedulingAgent {
|
||||||
try {
|
try {
|
||||||
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
|
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
|
||||||
} finally {
|
} finally {
|
||||||
scheduleState.decrementActiveThreadCount(null);
|
scheduleState.decrementActiveThreadCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
|
|
Loading…
Reference in New Issue