NIFI-1754 Rollback log messages should include the flowfile filename and UUID to assist in flow management. Incorporates debug logging into StandardProcessSession.rollback() to list Flowfile records retreived from the session.

Reviewed with slight style amendment (see Jira for details) by Tony Kurc (tkurc@apache.org). This closes #478.
This commit is contained in:
Joe Skora 2016-05-25 15:51:02 -04:00 committed by trkurc
parent f47af1ce83
commit c6af6be44c
1 changed files with 44 additions and 0 deletions

View File

@ -99,6 +99,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class); private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims"); private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims");
private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
@ -861,6 +862,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
private void rollback(final boolean penalize, final boolean rollbackCheckpoint) { private void rollback(final boolean penalize, final boolean rollbackCheckpoint) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} session rollback called, FlowFile records are {} {}",
this, loggableFlowfileInfo(), new Throwable("Stack Trace on rollback"));
}
deleteOnCommit.clear(); deleteOnCommit.clear();
final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>(); final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
@ -950,6 +956,44 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
resetState(); resetState();
} }
private String loggableFlowfileInfo() {
final StringBuilder details = new StringBuilder(1024).append("[");
final int initLen = details.length();
int filesListed = 0;
for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) {
if (filesListed >= MAX_ROLLBACK_FLOWFILES_TO_LOG) {
break;
}
filesListed++;
final FlowFileRecord entryKey = entry.getKey();
final StandardRepositoryRecord entryValue = entry.getValue();
if (details.length() > initLen) {
details.append(", ");
}
if (entryValue.getOriginalQueue() != null && entryValue.getOriginalQueue().getIdentifier() != null) {
details.append("queue=")
.append(entryValue.getOriginalQueue().getIdentifier())
.append("/");
}
details.append("filename=")
.append(entryKey.getAttribute(CoreAttributes.FILENAME.key()))
.append("/uuid=")
.append(entryKey.getAttribute(CoreAttributes.UUID.key()));
}
if (records.entrySet().size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
if (details.length() > initLen) {
details.append(", ");
}
details.append(records.entrySet().size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
.append(" additional Flowfiles not listed");
} else if (filesListed == 0) {
details.append("none");
}
details.append("]");
return details.toString();
}
private void removeContent(final ContentClaim claim) { private void removeContent(final ContentClaim claim) {
if (claim == null) { if (claim == null) {
return; return;