From c6af6be44cc64b851d8d1c3888e6d2c3f9bf6e05 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Wed, 25 May 2016 15:51:02 -0400 Subject: [PATCH] NIFI-1754 Rollback log messages should include the flowfile filename and UUID to assist in flow management. Incorporates debug logging into StandardProcessSession.rollback() to list Flowfile records retreived from the session. Reviewed with slight style amendment (see Jira for details) by Tony Kurc (tkurc@apache.org). This closes #478. --- .../repository/StandardProcessSession.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index cb789e289b..0071fedc71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -99,6 +99,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class); private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims"); + private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5; private final Map records = new HashMap<>(); private final Map connectionCounts = new HashMap<>(); @@ -861,6 +862,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void rollback(final boolean penalize, final boolean rollbackCheckpoint) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} session rollback called, FlowFile records are {} {}", + this, loggableFlowfileInfo(), new Throwable("Stack Trace on rollback")); + } + deleteOnCommit.clear(); final Set recordsToHandle = new HashSet<>(); @@ -950,6 +956,44 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetState(); } + private String loggableFlowfileInfo() { + final StringBuilder details = new StringBuilder(1024).append("["); + final int initLen = details.length(); + int filesListed = 0; + for (Map.Entry entry : records.entrySet()) { + if (filesListed >= MAX_ROLLBACK_FLOWFILES_TO_LOG) { + break; + } + filesListed++; + final FlowFileRecord entryKey = entry.getKey(); + final StandardRepositoryRecord entryValue = entry.getValue(); + if (details.length() > initLen) { + details.append(", "); + } + if (entryValue.getOriginalQueue() != null && entryValue.getOriginalQueue().getIdentifier() != null) { + details.append("queue=") + .append(entryValue.getOriginalQueue().getIdentifier()) + .append("/"); + } + details.append("filename=") + .append(entryKey.getAttribute(CoreAttributes.FILENAME.key())) + .append("/uuid=") + .append(entryKey.getAttribute(CoreAttributes.UUID.key())); + } + if (records.entrySet().size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) { + if (details.length() > initLen) { + details.append(", "); + } + details.append(records.entrySet().size() - MAX_ROLLBACK_FLOWFILES_TO_LOG) + .append(" additional Flowfiles not listed"); + } else if (filesListed == 0) { + details.append("none"); + } + details.append("]"); + return details.toString(); + } + + private void removeContent(final ContentClaim claim) { if (claim == null) { return;