NIFI-6110: Updated StandardProcessSession such that if we fail to update the FlowFile Repository, we do not decrement claimant counts for any FlowFiles that were removed. Doing so can cause an issue where a FlowFile is removed, then the FlowFileRepo update fails, resulting in the flowfile being rolled back, but after its claimant count is decremented. It will then be processed again, which can result in the same thing, and we'll end up decrementing the claimant count repeatedly. Also updated LengthDelimitedJournal so that if the overflow directory already exists, it does not fail when trying to create the directory and instead just moves on. Updated unit tests to test both of these new fixes and updated DummyRecordSerde to be thread-safe because the TestLengthDelimitedJournal now needs it to be thread safe due to the new unit test that was added.

This closes #3358.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-03-08 08:55:53 -05:00 committed by Bryan Bende
parent 79e05c9f58
commit 58a25cfa5a
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 170 additions and 38 deletions

View File

@ -40,6 +40,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.HashMap;
@ -218,6 +219,11 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
}
// Visible/overrideable for testing.
protected void createOverflowDirectory(final Path path) throws IOException {
Files.createDirectories(path);
}
@Override
public void update(final Collection<T> records, final RecordLookup<T> recordLookup) throws IOException {
if (!headerWritten) {
@ -246,7 +252,7 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
final int size = bados.getByteArrayOutputStream().size();
if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) {
if (!overflowDirectory.exists()) {
Files.createDirectory(overflowDirectory.toPath());
createOverflowDirectory(overflowDirectory.toPath());
}
// If we have exceeded our threshold for how much to serialize in memory,

View File

@ -17,16 +17,21 @@
package org.apache.nifi.wali;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -36,15 +41,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestLengthDelimitedJournal {
private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal");
@ -307,6 +311,71 @@ public class TestLengthDelimitedJournal {
}
}
@Test
public void testMultipleThreadsCreatingOverflowDirectory() throws IOException, InterruptedException {
final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, streamPool, 3820L, 100) {
@Override
protected void createOverflowDirectory(final Path path) throws IOException {
// Create the overflow directory.
super.createOverflowDirectory(path);
// Ensure that a second call to create the overflow directory will not cause an issue.
super.createOverflowDirectory(path);
}
};
// Ensure that the overflow directory does not exist.
journal.dispose();
try {
journal.writeHeader();
final List<DummyRecord> largeCollection1 = new ArrayList<>();
for (int i=0; i < 1_000; i++) {
largeCollection1.add(new DummyRecord(String.valueOf(i), UpdateType.CREATE));
}
final Map<String, DummyRecord> recordMap = largeCollection1.stream()
.collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
final List<DummyRecord> largeCollection2 = new ArrayList<>();
for (int i=0; i < 1_000; i++) {
largeCollection2.add(new DummyRecord(String.valueOf(5_000_000 + i), UpdateType.CREATE));
}
final Map<String, DummyRecord> recordMap2 = largeCollection2.stream()
.collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
final AtomicReference<Exception> thread1Failure = new AtomicReference<>();
final Thread t1 = new Thread(() -> {
try {
journal.update(largeCollection1, recordMap::get);
} catch (final Exception e) {
e.printStackTrace();
thread1Failure.set(e);
}
});
t1.start();
final AtomicReference<Exception> thread2Failure = new AtomicReference<>();
final Thread t2 = new Thread(() -> {
try {
journal.update(largeCollection2, recordMap2::get);
} catch (final Exception e) {
e.printStackTrace();
thread2Failure.set(e);
}
});
t2.start();
t1.join();
t2.join();
assertNull(thread1Failure.get());
assertNull(thread2Failure.get());
} finally {
journal.close();
}
}
@Test
public void testTruncatedJournalFile() throws IOException {
final DummyRecord firstRecord, secondRecord;

View File

@ -43,7 +43,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
@SuppressWarnings("fallthrough")
@Override
public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
public synchronized void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) {
throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE");
}
@ -80,13 +80,13 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
}
@Override
public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException {
public synchronized void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException {
serializeEdit(null, record, out);
}
@Override
@SuppressWarnings("fallthrough")
public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
public synchronized DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
if (externalRecords != null) {
final DummyRecord record = externalRecords.poll();
if (record != null) {
@ -122,7 +122,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
}
@Override
public boolean isMoreInExternalFile() {
public synchronized boolean isMoreInExternalFile() {
return externalRecords != null && !externalRecords.isEmpty();
}
@ -189,11 +189,11 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
return 1;
}
public void setThrowIOEAfterNSerializeEdits(final int n) {
public synchronized void setThrowIOEAfterNSerializeEdits(final int n) {
this.throwIOEAfterNserializeEdits = n;
}
public void setThrowOOMEAfterNSerializeEdits(final int n) {
public synchronized void setThrowOOMEAfterNSerializeEdits(final int n) {
this.throwOOMEAfterNserializeEdits = n;
}
@ -208,7 +208,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
}
@Override
public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
public synchronized void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
out.write(EXTERNAL_FILE_INDICATOR);
out.writeUTF(externalFile.getAbsolutePath());

View File

@ -359,8 +359,25 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
final long flowFileRepoUpdateStart = System.nanoTime();
final long updateProvenanceNanos = flowFileRepoUpdateStart - updateProvenanceStart;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
// if we fail to commit the session, we need to roll back
// the checkpoints as well because none of the checkpoints
// were ever committed.
rollback(false, true);
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart;
final long claimRemovalStart = flowFileRepoUpdateFinishNanos;
/**
* Figure out which content claims can be released. At this point,
@ -401,25 +418,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
// if we fail to commit the session, we need to roll back
// the checkpoints as well because none of the checkpoints
// were ever committed.
rollback(false, true);
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
updateEventRepository(checkpoint);
final long updateEventRepositoryFinishNanos = System.nanoTime();
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();

View File

@ -92,6 +92,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
@ -324,6 +325,60 @@ public class TestStandardProcessSession {
}
@Test
public void testUpdateFlowFileRepoFailsOnSessionCommit() throws IOException {
final ContentClaim contentClaim = contentRepo.create("original".getBytes());
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(8L)
.contentClaim(contentClaim)
.build();
flowFileQueue.put(flowFileRecord);
final Relationship relationship = new Relationship.Builder().name("A").build();
FlowFile ff1 = session.get();
assertNotNull(ff1);
// Fork a child FlowFile.
final FlowFile child = session.create(flowFileRecord);
final FlowFile updated = session.write(flowFileRecord, out -> out.write("update".getBytes()));
final ContentClaim updatedContentClaim = ((FlowFileRecord) updated).getContentClaim();
session.remove(updated);
final FlowFile updatedChild = session.write(child, out -> out.write("hello".getBytes(StandardCharsets.UTF_8)));
session.transfer(updatedChild, relationship);
final ContentClaim childContentClaim = ((FlowFileRecord) updatedChild).getContentClaim();
flowFileRepo.setFailOnUpdate(true);
assertEquals(1, contentRepo.getClaimantCount(contentClaim));
// these will be the same content claim due to how the StandardProcessSession adds multiple FlowFiles' contents to a single claim.
assertSame(updatedContentClaim, childContentClaim);
assertEquals(2, contentRepo.getClaimantCount(childContentClaim));
try {
session.commit();
Assert.fail("Expected session commit to fail");
} catch (final ProcessException pe) {
// Expected
}
// Ensure that if we fail to update teh flowfile repo, that the claimant count of the 'original' flowfile, which was removed, does not get decremented.
assertEquals(1, contentRepo.getClaimantCount(contentClaim));
assertEquals(0, contentRepo.getClaimantCount(updatedContentClaim)); // temporary claim should be cleaned up.
assertEquals(0, contentRepo.getClaimantCount(childContentClaim)); // temporary claim should be cleaned up.
assertEquals(1, flowFileQueue.size().getObjectCount());
assertEquals(8L, flowFileQueue.size().getByteCount());
}
@Test
public void testCloneOriginalDataSmaller() throws IOException {
final byte[] originalContent = "hello".getBytes();