NIFI-7856: If a Provenance Event File is ready to be rolled over due to the maximum amount of time having elapsed, avoid rolling over unless there is at least one event written to the event file. Otherwise, we can have multiple RecordWriters / RecordWriterLeases pointing to the same file. This can result in data being overwritten, as well as failing to compress the event file upon rollover. Also added significant DEBUG/TRACE level logging.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4580
This commit is contained in:
Mark Payne 2020-10-07 13:30:37 -04:00 committed by Matthew Burgess
parent 91f6b42985
commit a73cd6a610
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 32 additions and 6 deletions

View File

@ -89,10 +89,12 @@ public class RecordWriterLease {
if (writer.getBytesWritten() >= maxBytes) {
return RolloverState.MAX_BYTES_REACHED;
}
if (writer.getRecordsWritten() >= maxEvents) {
final int recordsWritten = writer.getRecordsWritten();
if (recordsWritten >= maxEvents) {
return RolloverState.MAX_EVENTS_REACHED;
}
if (System.currentTimeMillis() >= maxSystemTime) {
if (recordsWritten > 0 && System.currentTimeMillis() >= maxSystemTime) {
return RolloverState.MAX_TIME_REACHED;
}
@ -123,4 +125,11 @@ public class RecordWriterLease {
}
}
}
public String toString() {
// Call super.toString() so that we have a unique hash/address added to the toString, but also include the file being written to.
// When comparing the toString() of two different leases, this helps to compare whether or not the two leases are the same object
// as well as whether or not they point to the same file.
return super.toString() + "[" + writer.getFile() + "]";
}
}

View File

@ -179,14 +179,20 @@ public class WriteAheadStorePartition implements EventStorePartition {
while (true) {
lease = getLease();
if (lease.tryClaim()) {
logger.trace("Obtained claim on Lease {}", lease);
break;
}
final RolloverState rolloverState = lease.getRolloverState();
if (rolloverState.isRollover()) {
logger.trace("Failed to obtain claim on Lease {}; Rollover State = {}", lease, rolloverState);
final boolean success = tryRollover(lease);
if (success) {
logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState);
final RecordWriter writer = lease.getWriter();
final String eventFileSize = FormatUtils.formatDataSize(writer.getBytesWritten());
logger.info("Successfully rolled over Event Writer for {} due to {}. Event File was {} and contained {} events.", this, rolloverState, eventFileSize,
writer.getRecordsWritten());
}
}
}
@ -201,13 +207,19 @@ public class WriteAheadStorePartition implements EventStorePartition {
lease.relinquishClaim();
}
logger.trace("Wrote {} events to Lease {}. Relinquished claim.", storageMap.size(), lease);
// Roll over the writer if necessary
Integer eventsRolledOver = null;
final RolloverState rolloverState = lease.getRolloverState();
try {
if (rolloverState.isRollover() && tryRollover(lease)) {
eventsRolledOver = writer.getRecordsWritten();
logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", this, eventsRolledOver, rolloverState);
if (rolloverState.isRollover()) {
logger.debug("Will attempt to roll over Lease {} because Rollover State is {}", lease, rolloverState);
if (tryRollover(lease)) {
eventsRolledOver = writer.getRecordsWritten();
logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", this, eventsRolledOver, rolloverState);
}
}
} catch (final IOException ioe) {
logger.error("Updated {} but failed to rollover to a new Event File", this, ioe);
@ -252,6 +264,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
private synchronized boolean tryRollover(final RecordWriterLease lease) throws IOException {
if (!Objects.equals(lease, eventWriterLeaseRef.get())) {
logger.trace("Will not rollover Lease {} because it's not the current event writer lease", lease);
return false;
}
@ -264,6 +277,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
if (!updated) {
logger.trace("Did not update Event Writer Lease. Will remain {}. Not rolling over Lease.", lease);
try {
updatedWriter.close();
} catch (final Exception e) {
@ -274,6 +288,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
return false;
}
logger.trace("Updated lease from {} to {}", lease, updatedLease);
if (lease != null) {
lease.close();
}
@ -292,8 +307,10 @@ public class WriteAheadStorePartition implements EventStorePartition {
throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression");
}
}
logger.debug("Queued {} ({}) for compression", lease, lease.getWriter().getFile());
}
logger.debug("Successfully rolled over Lease from {} to {}", lease, updatedLease);
return true;
}