NIFI-2376 This closes #713. Ensure that we don't decrement claimant count more than once when append() throws an Exception

This commit is contained in:
Mark Payne 2016-07-23 15:05:20 -04:00 committed by joewitt
parent 6932a53ec9
commit 4e08ea6525
5 changed files with 212 additions and 17 deletions

View File

@ -79,7 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* processing. Must be thread safe.
*
*/
public final class StandardFlowFileQueue implements FlowFileQueue {
public class StandardFlowFileQueue implements FlowFileQueue {
public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
public static final int SWAP_RECORD_POLL_SIZE = 10000;

View File

@ -964,7 +964,7 @@ public class FileSystemRepository implements ContentRepository {
final boolean enqueued = writableClaimQueue.offer(pair);
if (enqueued) {
LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this);
} else {
writableClaimStreams.remove(scc.getResourceClaim());
bcos.close();
@ -1103,7 +1103,8 @@ public class FileSystemRepository implements ContentRepository {
return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
private boolean archive(final ResourceClaim claim) throws IOException {
// visible for testing
boolean archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return false;
}

View File

@ -330,13 +330,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
removeContent(record.getWorkingClaim());
decrementClaimCount(record.getWorkingClaim());
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
removeContent(record.getOriginalClaim());
decrementClaimCount(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
@ -344,7 +344,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
// records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
decrementClaimCount(record.getOriginalClaim());
}
}
@ -923,12 +923,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
for (final StandardRepositoryRecord record : recordsToHandle) {
if (record.isMarkedForAbort()) {
removeContent(record.getWorkingClaim());
decrementClaimCount(record.getWorkingClaim());
if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the flowfile attributes.
removeContent(record.getCurrentClaim());
decrementClaimCount(record.getCurrentClaim());
}
abortedRecords.add(record);
} else {
@ -1020,7 +1020,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void removeContent(final ContentClaim claim) {
private void decrementClaimCount(final ContentClaim claim) {
if (claim == null) {
return;
}
@ -1733,7 +1733,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
record.markForDelete();
expiredRecords.add(record);
expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
removeContent(flowFile.getContentClaim());
decrementClaimCount(flowFile.getContentClaim());
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
@ -2198,9 +2198,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
originalByteWrittenCount = outStream.getBytesWritten();
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream);
final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) {
recursionSet.add(source);
writer.process(disableOnClose);
writer.process(flowFileAccessOutStream);
} finally {
recursionSet.remove(source);
}
@ -2210,15 +2211,37 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newSize = outStream.getBytesWritten();
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
// If the content claim changed, then we should destroy the new one. We do this
// because the new content claim will never get set as the 'working claim' for the FlowFile
// record since we will throw an Exception. As a result, we need to ensure that we have
// appropriately decremented the claimant count and can destroy the content if it is no
// longer in use. However, it is critical that we do this ONLY if the content claim has
// changed. Otherwise, the FlowFile already has a reference to this Content Claim and
// whenever the FlowFile is removed, the claim count will be decremented; if we decremented
// it here also, we would be decrementing the claimant count twice!
if (newClaim != oldClaim) {
destroyContent(newClaim);
}
handleContentNotFound(nfe, record);
} catch (final IOException ioe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw new FlowFileAccessException("Exception in callback: " + ioe.toString(), ioe);
// See above explanation for why this is done only if newClaim != oldClaim
if (newClaim != oldClaim) {
destroyContent(newClaim);
}
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
// See above explanation for why this is done only if newClaim != oldClaim
if (newClaim != oldClaim) {
destroyContent(newClaim);
}
throw t;
} finally {
if (outStream != null) {
@ -2227,6 +2250,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
// If the record already has a working claim, and this is the first time that we are appending to the FlowFile,
// destroy the current working claim because it is a temporary claim that
// is no longer going to be used, as we are about to set a new working claim. This would happen, for instance, if
// the FlowFile was written to, via #write() and then append() was called.
if (newClaim != oldClaim) {
removeTemporaryClaim(record);
}

View File

@ -495,6 +495,55 @@ public class TestFileSystemRepository {
}
@Test
public void testWriteCannotProvideNullOutput() throws IOException {
FileSystemRepository repository = null;
try {
final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>());
// We are creating our own 'local' repository in this test so shut down the one created in the setup() method
shutdown();
repository = new FileSystemRepository() {
@Override
protected boolean archive(Path curPath) throws IOException {
if (getOpenStreamCount() > 0) {
archivedPathsWithOpenStream.add(curPath);
}
return true;
}
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.purge();
final ContentClaim claim = repository.create(false);
assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim());
assertEquals(0, claimantCount);
assertTrue(archivedPathsWithOpenStream.isEmpty());
OutputStream out = repository.write(claim);
out.close();
repository.decrementClaimantCount(claim);
ContentClaim claim2 = repository.create(false);
assertEquals(claim.getResourceClaim(), claim2.getResourceClaim());
out = repository.write(claim2);
final boolean archived = repository.archive(claim.getResourceClaim());
assertFalse(archived);
} finally {
if (repository != null) {
repository.shutdown();
}
}
}
/**
* We have encountered a situation where the File System Repo is moving files to archive and then eventually
* aging them off while there is still an open file handle. This test is meant to replicate the conditions under

View File

@ -68,6 +68,7 @@ import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
@ -144,7 +145,8 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
flowFileQueue = Mockito.spy(actualQueue);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
@ -206,6 +208,71 @@ public class TestStandardProcessSession {
session = new StandardProcessSession(context);
}
@Test
public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.create(original);
child = session.append(child, out -> out.write("hello".getBytes()));
// Force an IOException. This will decrement out claim count for the resource claim.
try {
child = session.append(child, out -> {
throw new IOException();
});
Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException");
} catch (final ProcessException pe) {
// expected
}
session.remove(child);
session.transfer(original);
session.commit();
final int numClaims = contentRepo.getExistingClaims().size();
assertEquals(0, numClaims);
}
@Test
public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.create(original);
// Force an IOException. This will decrement out claim count for the resource claim.
try {
child = session.write(child, out -> out.write("hello".getBytes()));
child = session.write(child, out -> {
throw new IOException();
});
Assert.fail("write() callback threw IOException but it was not wrapped in ProcessException");
} catch (final ProcessException pe) {
// expected
}
session.remove(child);
session.transfer(original);
session.commit();
final int numClaims = contentRepo.getExistingClaims().size();
assertEquals(0, numClaims);
}
@Test
public void testModifyContentThenRollback() throws IOException {
assertEquals(0, contentRepo.getExistingClaims().size());
@ -806,6 +873,57 @@ public class TestStandardProcessSession {
assertEquals("Hello, World", new String(buff));
}
@Test
public void testAppendDoesNotDecrementContentClaimIfNotNeeded() {
FlowFile flowFile = session.create();
session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("hello".getBytes());
}
});
final Set<ContentClaim> existingClaims = contentRepo.getExistingClaims();
assertEquals(1, existingClaims.size());
final ContentClaim claim = existingClaims.iterator().next();
final int countAfterAppend = contentRepo.getClaimantCount(claim);
assertEquals(1, countAfterAppend);
}
@Test
@SuppressWarnings("unchecked")
public void testExpireDecrementsClaimsOnce() throws IOException {
final ContentClaim contentClaim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(contentClaim)
.build();
Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
int iterations = 0;
@Override
public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
if (iterations++ == 0) {
final Set<FlowFileRecord> expired = invocation.getArgumentAt(1, Set.class);
expired.add(flowFileRecord);
}
return null;
}
}).when(flowFileQueue).poll(Mockito.any(FlowFileFilter.class), Mockito.any(Set.class));
session.expireFlowFiles();
session.commit(); // if the content claim count is decremented to less than 0, an exception will be thrown.
assertEquals(1L, contentRepo.getClaimsRemoved());
}
@Test
public void testManyFilesOpened() throws IOException {