NIFI-1433: Once we roll over journal files, don't attempt to roll them over again

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2016-01-23 15:29:58 -05:00 committed by Aldrin Piri
parent 5c36358bc2
commit 05dabe034c
4 changed files with 210 additions and 75 deletions

View File

@ -240,6 +240,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final Exception e) {
logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
logger.error("", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to roll over Provenance Event Log due to " + e.toString());
}
}
} finally {
@ -730,7 +731,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final Exception e) {
logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString());
logger.error("", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Log due to " + e.toString());
} finally {
// we must re-lock the readLock, as the finally block below is going to unlock it.
readLock.lock();
@ -756,7 +757,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final IOException e) {
logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString());
logger.error("", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Log due to " + e.toString());
}
}
} finally {
@ -1142,6 +1143,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return journalFileCount;
}
/**
* Method is exposed for unit testing
*
* @param force whether or not to force a rollover.
* @throws IOException if unable to complete rollover
*/
void rolloverWithLock(final boolean force) throws IOException {
writeLock.lock();
try {
rollover(force);
} finally {
writeLock.unlock();
}
}
/**
* <p>
* MUST be called with the write lock held.
@ -1163,19 +1180,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (force || recordsWrittenSinceRollover.get() > 0L || dirtyWriterCount.get() > 0) {
final List<File> journalsToMerge = new ArrayList<>();
for (final RecordWriter writer : writers) {
final File writerFile = writer.getFile();
journalsToMerge.add(writerFile);
try {
writer.close();
} catch (final IOException ioe) {
logger.warn("Failed to close {} due to {}", writer, ioe.toString());
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
if (!writer.isClosed()) {
final File writerFile = writer.getFile();
journalsToMerge.add(writerFile);
try {
writer.close();
} catch (final IOException ioe) {
logger.warn("Failed to close {} due to {}", writer, ioe.toString());
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
}
}
if ( logger.isDebugEnabled() ) {
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
if (logger.isDebugEnabled()) {
if (journalsToMerge.isEmpty()) {
logger.debug("No journals to merge; all RecordWriters were already closed");
} else {
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
}
}
// Choose a storage directory to store the merged file in.
@ -1183,66 +1207,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final List<File> storageDirs = configuration.getStorageDirectories();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
// Run the rollover logic in a background thread.
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
try {
final File fileRolledOver;
Future<?> future = null;
if (!journalsToMerge.isEmpty()) {
// Run the rollover logic in a background thread.
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
try {
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
return;
}
final File fileRolledOver;
if (fileRolledOver == null) {
logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
return;
}
final File file = fileRolledOver;
// update our map of id to Path
// We need to make sure that another thread doesn't also update the map at the same time. We cannot
// use the write lock when purging old events, and we want to use the same approach here.
boolean updated = false;
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(existingPathMap);
newIdToPathMap.put(fileFirstEventId, file.toPath());
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
}
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
rolloverCompletions.getAndIncrement();
// We have finished successfully. Cancel the future so that we don't run anymore
Future<?> future;
while ((future = futureReference.get()) == null) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
return;
}
if (fileRolledOver == null) {
logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
return;
}
final File file = fileRolledOver;
// update our map of id to Path
// We need to make sure that another thread doesn't also update the map at the same time. We cannot
// use the write lock when purging old events, and we want to use the same approach here.
boolean updated = false;
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(existingPathMap);
newIdToPathMap.put(fileFirstEventId, file.toPath());
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
}
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
rolloverCompletions.getAndIncrement();
// We have finished successfully. Cancel the future so that we don't run anymore
Future<?> future;
while ((future = futureReference.get()) == null) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
}
}
future.cancel(false);
} catch (final Throwable t) {
logger.error("Failed to rollover Provenance repository due to {}", t.toString());
logger.error("", t);
}
future.cancel(false);
} catch (final Throwable t) {
logger.error("Failed to rollover Provenance repository due to {}", t.toString());
logger.error("", t);
}
}
};
};
// We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
// fail for some reason. When we succeed, the Runnable will cancel itself.
final Future<?> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
futureReference.set(future);
// We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
// fail for some reason. When we succeed, the Runnable will cancel itself.
future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
futureReference.set(future);
}
streamStartTime.set(System.currentTimeMillis());
bytesWrittenSinceRollover.set(0);
@ -1271,7 +1298,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
// if a shutdown happens while we are in this loop, kill the rollover thread and break
if (this.closed.get()) {
future.cancel(true);
if (future != null) {
future.cancel(true);
}
break;
}
@ -1504,7 +1534,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "re " + ioe.toString());
}
}
}

View File

@ -51,6 +51,7 @@ public class StandardRecordWriter implements RecordWriter {
private ByteCountingOutputStream byteCountingOut;
private long lastBlockOffset = 0L;
private int recordCount = 0;
private volatile boolean closed = false;
private final Lock lock = new ReentrantLock();
@ -295,6 +296,8 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public synchronized void close() throws IOException {
closed = true;
logger.trace("Closing Record Writer for {}", file.getName());
lock();
@ -330,7 +333,11 @@ public class StandardRecordWriter implements RecordWriter {
} finally {
unlock();
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override

View File

@ -92,4 +92,9 @@ public interface RecordWriter extends Closeable {
* @return the TOC Writer that is being used to write the Table of Contents for this journal
*/
TocWriter getTocWriter();
/**
* @return <code>true</code> if this Writer has been closed via the {@link #close()} method, <code>false</code> otherwise
*/
boolean isClosed();
}

View File

@ -27,6 +27,7 @@ import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -89,6 +90,8 @@ public class TestPersistentProvenanceRepository {
private RepositoryConfiguration config;
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
private EventReporter eventReporter;
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
private RepositoryConfiguration createConfiguration() {
config = new RepositoryConfiguration();
@ -107,6 +110,17 @@ public class TestPersistentProvenanceRepository {
@Before
public void printTestName() {
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
reportedEvents.clear();
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(Severity severity, String category, String message) {
reportedEvents.add(new ReportedEvent(severity, category, message));
System.out.println(severity + " : " + category + " : " + message);
}
};
}
@After
@ -146,14 +160,7 @@ public class TestPersistentProvenanceRepository {
private EventReporter getEventReporter() {
return new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(Severity severity, String category, String message) {
System.out.println(severity + " : " + category + " : " + message);
}
};
return eventReporter;
}
@Test
@ -1238,6 +1245,68 @@ public class TestPersistentProvenanceRepository {
assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
}
@Test
public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxAttributeChars(50);
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
// Create a repo that will allow only a single writer to be created.
final IOException failure = new IOException("Already created writers once. Unit test causing failure.");
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
int iterations = 0;
@Override
protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
if (iterations++ == 1) {
throw failure;
} else {
return super.createWriters(config, initialRecordId);
}
}
};
// initialize with our event reporter
repo.initialize(getEventReporter());
// create some events in the journal files.
final Map<String, String> attributes = new HashMap<>();
attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i = 0; i < 50; i++) {
final ProvenanceEventRecord event = builder.build();
repo.registerEvent(event);
}
// Attempt to rollover but fail to create new writers.
try {
repo.rolloverWithLock(true);
Assert.fail("Expected to get IOException when calling rolloverWithLock");
} catch (final IOException ioe) {
assertTrue(ioe == failure);
}
// Wait for the first rollover to succeed.
repo.waitForRollover();
// This time when we rollover, we should not have a problem rolling over.
repo.rolloverWithLock(true);
// Ensure that no errors were reported.
assertEquals(0, reportedEvents.size());
}
@Test
public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
@ -1343,4 +1412,28 @@ public class TestPersistentProvenanceRepository {
}
}
private static class ReportedEvent {
private final Severity severity;
private final String category;
private final String message;
public ReportedEvent(final Severity severity, final String category, final String message) {
this.severity = severity;
this.category = category;
this.message = message;
}
public String getCategory() {
return category;
}
public String getMessage() {
return message;
}
public Severity getSeverity() {
return severity;
}
}
}