diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java index 5757e59567..a3ed241634 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java @@ -89,10 +89,12 @@ public class RecordWriterLease { if (writer.getBytesWritten() >= maxBytes) { return RolloverState.MAX_BYTES_REACHED; } - if (writer.getRecordsWritten() >= maxEvents) { + + final int recordsWritten = writer.getRecordsWritten(); + if (recordsWritten >= maxEvents) { return RolloverState.MAX_EVENTS_REACHED; } - if (System.currentTimeMillis() >= maxSystemTime) { + if (recordsWritten > 0 && System.currentTimeMillis() >= maxSystemTime) { return RolloverState.MAX_TIME_REACHED; } @@ -123,4 +125,11 @@ public class RecordWriterLease { } } } + + public String toString() { + // Call super.toString() so that we have a unique hash/address added to the toString, but also include the file being written to. + // When comparing the toString() of two different leases, this helps to compare whether or not the two leases are the same object + // as well as whether or not they point to the same file. + return super.toString() + "[" + writer.getFile() + "]"; + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 9d805f9488..1df84f6d41 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -179,14 +179,20 @@ public class WriteAheadStorePartition implements EventStorePartition { while (true) { lease = getLease(); if (lease.tryClaim()) { + logger.trace("Obtained claim on Lease {}", lease); break; } final RolloverState rolloverState = lease.getRolloverState(); if (rolloverState.isRollover()) { + logger.trace("Failed to obtain claim on Lease {}; Rollover State = {}", lease, rolloverState); + final boolean success = tryRollover(lease); if (success) { - logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState); + final RecordWriter writer = lease.getWriter(); + final String eventFileSize = FormatUtils.formatDataSize(writer.getBytesWritten()); + logger.info("Successfully rolled over Event Writer for {} due to {}. Event File was {} and contained {} events.", this, rolloverState, eventFileSize, + writer.getRecordsWritten()); } } } @@ -201,13 +207,19 @@ public class WriteAheadStorePartition implements EventStorePartition { lease.relinquishClaim(); } + logger.trace("Wrote {} events to Lease {}. Relinquished claim.", storageMap.size(), lease); + // Roll over the writer if necessary Integer eventsRolledOver = null; final RolloverState rolloverState = lease.getRolloverState(); try { - if (rolloverState.isRollover() && tryRollover(lease)) { - eventsRolledOver = writer.getRecordsWritten(); - logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", this, eventsRolledOver, rolloverState); + if (rolloverState.isRollover()) { + logger.debug("Will attempt to roll over Lease {} because Rollover State is {}", lease, rolloverState); + + if (tryRollover(lease)) { + eventsRolledOver = writer.getRecordsWritten(); + logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", this, eventsRolledOver, rolloverState); + } } } catch (final IOException ioe) { logger.error("Updated {} but failed to rollover to a new Event File", this, ioe); @@ -252,6 +264,7 @@ public class WriteAheadStorePartition implements EventStorePartition { private synchronized boolean tryRollover(final RecordWriterLease lease) throws IOException { if (!Objects.equals(lease, eventWriterLeaseRef.get())) { + logger.trace("Will not rollover Lease {} because it's not the current event writer lease", lease); return false; } @@ -264,6 +277,7 @@ public class WriteAheadStorePartition implements EventStorePartition { final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); if (!updated) { + logger.trace("Did not update Event Writer Lease. Will remain {}. Not rolling over Lease.", lease); try { updatedWriter.close(); } catch (final Exception e) { @@ -274,6 +288,7 @@ public class WriteAheadStorePartition implements EventStorePartition { return false; } + logger.trace("Updated lease from {} to {}", lease, updatedLease); if (lease != null) { lease.close(); } @@ -292,8 +307,10 @@ public class WriteAheadStorePartition implements EventStorePartition { throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); } } + logger.debug("Queued {} ({}) for compression", lease, lease.getWriter().getFile()); } + logger.debug("Successfully rolled over Lease from {} to {}", lease, updatedLease); return true; }