NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing to close the resource before attempting to compress it. Fixed that.

NIFI-4439: Addressed threading bug that can occur when rolling over provenance record writer

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-10-03 09:44:54 -04:00 committed by joewitt
parent ba8f17bac0
commit b950eed1a5
1 changed files with 34 additions and 25 deletions

View File

@ -253,38 +253,47 @@ public class WriteAheadStorePartition implements EventStorePartition {
final long nextEventId = idGenerator.get(); final long nextEventId = idGenerator.get();
final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov");
final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true);
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount());
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
if (updated) { // Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has
updatedWriter.writeHeader(nextEventId); // been fully initialized (i.e., the header has been written, etc.)
synchronized (updatedWriter) {
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount());
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
synchronized (minEventIdToPathMap) { if (updated) {
minEventIdToPathMap.put(nextEventId, updatedEventFile); if (lease != null) {
} lease.close();
}
if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { updatedWriter.writeHeader(nextEventId);
boolean offered = false;
while (!offered && !closed) { synchronized (minEventIdToPathMap) {
try { minEventIdToPathMap.put(nextEventId, updatedEventFile);
offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); }
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt(); if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) {
throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); boolean offered = false;
while (!offered && !closed) {
try {
offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression");
}
} }
} }
}
return true; return true;
} else { } else {
try { try {
updatedWriter.close(); updatedWriter.close();
} catch (final Exception e) { } catch (final Exception e) {
logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e);
} }
updatedEventFile.delete(); updatedEventFile.delete();
return false; return false;
}
} }
} }