NIFI-1824: If attempting to archive content, and there are no claimant counts for it, ensure that the stream is closed.

This commit is contained in:
Mark Payne 2016-04-28 15:04:23 -04:00 committed by Oleg Zhurakousky
parent 1df8fe44c4
commit e3bdee8b1e
3 changed files with 196 additions and 37 deletions

View File

@ -540,6 +540,15 @@ public class FileSystemRepository implements ContentRepository {
resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
resourceOffset = 0L;
LOG.debug("Creating new Resource Claim {}", resourceClaim);
// we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads
// at the same time because we will call create() to get the claim before we write to it,
// and when we call create(), it will remove it from the Queue, which means that no other
// thread will get the same Claim until we've finished writing to it.
final File file = getPath(resourceClaim).toFile();
ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length());
writableClaimStreams.put(resourceClaim, claimStream);
} else {
resourceClaim = pair.getClaim();
resourceOffset = pair.getLength();
@ -841,25 +850,8 @@ public class FileSystemRepository implements ContentRepository {
final ResourceClaim resourceClaim = claim.getResourceClaim();
// we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads
// at the same time because we will call create() to get the claim before we write to it,
// and when we call create(), it will remove it from the Queue, which means that no other
// thread will get the same Claim until we've finished writing to it.
ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
final long initialLength;
if (claimStream == null) {
final File file = getPath(scc).toFile();
// use a synchronized stream because we want to pass this OutputStream out from one thread to another.
claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length());
initialLength = 0L;
} else {
if (append) {
initialLength = Math.max(0, scc.getLength());
} else {
initialLength = 0;
}
}
ByteCountingOutputStream claimStream = writableClaimStreams.get(scc.getResourceClaim());
final int initialLength = append ? (int) Math.max(0, scc.getLength()) : 0;
activeResourceClaims.add(resourceClaim);
final ByteCountingOutputStream bcos = claimStream;
@ -963,9 +955,9 @@ public class FileSystemRepository implements ContentRepository {
final boolean enqueued = writableClaimQueue.offer(pair);
if (enqueued) {
writableClaimStreams.put(scc.getResourceClaim(), bcos);
LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
} else {
writableClaimStreams.remove(scc.getResourceClaim());
bcos.close();
LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this);
@ -1114,6 +1106,19 @@ public class FileSystemRepository implements ContentRepository {
}
}
// If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that
// we close the stream if there is one. There may be a stream open if create() is called and then
// claimant count is removed without writing to the claim (or more specifically, without closing the
// OutputStream that is returned when calling write() ).
final OutputStream out = writableClaimStreams.remove(claim);
if (out != null) {
try {
out.close();
} catch (final IOException ioe) {
LOG.warn("Unable to close Output Stream for " + claim, ioe);
}
}
final Path curPath = getPath(claim);
if (curPath == null) {
return false;
@ -1124,7 +1129,12 @@ public class FileSystemRepository implements ContentRepository {
return archived;
}
private boolean archive(final Path curPath) throws IOException {
protected int getOpenStreamCount() {
return writableClaimStreams.size();
}
// marked protected for visibility and ability to override for unit tests.
protected boolean archive(final Path curPath) throws IOException {
// check if already archived
final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
if (alreadyArchived) {

View File

@ -35,9 +35,13 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.stream.io.StreamUtils;
@ -59,6 +63,7 @@ public class TestFileSystemRepository {
public static final File helloWorldFile = new File("src/test/resources/hello.txt");
private FileSystemRepository repository = null;
private StandardResourceClaimManager claimManager = null;
private final File rootFile = new File("target/content_repository");
@Before
@ -68,7 +73,8 @@ public class TestFileSystemRepository {
DiskUtils.deleteRecursively(rootFile);
}
repository = new FileSystemRepository();
repository.initialize(new StandardResourceClaimManager());
claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.purge();
}
@ -79,30 +85,45 @@ public class TestFileSystemRepository {
@Test
public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception {
// We are going to construct our own repository using different properties, so
// we need to shutdown the existing one.
shutdown();
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
ListAppender<ILoggingEvent> testAppender = new ListAppender<>();
testAppender.setName("Test");
testAppender.start();
root.addAppender(testAppender);
NiFiProperties.getInstance().setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis");
repository = new FileSystemRepository();
repository.initialize(new StandardResourceClaimManager());
repository.purge();
final NiFiProperties properties = NiFiProperties.getInstance();
final String originalCleanupFreq = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis");
try {
repository = new FileSystemRepository();
repository.initialize(new StandardResourceClaimManager());
repository.purge();
boolean messageFound = false;
String message = "The value of nifi.content.repository.archive.cleanup.frequency property "
boolean messageFound = false;
String message = "The value of nifi.content.repository.archive.cleanup.frequency property "
+ "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). "
+ "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task.";
for (ILoggingEvent event : testAppender.list) {
String actualMessage = event.getFormattedMessage();
if (actualMessage.equals(message)) {
assertEquals(event.getLevel(), Level.WARN);
messageFound = true;
break;
for (ILoggingEvent event : testAppender.list) {
String actualMessage = event.getFormattedMessage();
if (actualMessage.equals(message)) {
assertEquals(event.getLevel(), Level.WARN);
messageFound = true;
break;
}
}
assertTrue(messageFound);
} finally {
if (originalCleanupFreq == null) {
properties.remove(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
} else {
properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, originalCleanupFreq);
}
}
assertTrue(messageFound);
}
@Test
@ -357,13 +378,13 @@ public class TestFileSystemRepository {
@Test(expected = ContentNotFoundException.class)
public void testSizeWithNoContent() throws IOException {
final ContentClaim claim = repository.create(true);
final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L);
assertEquals(0L, repository.size(claim));
}
@Test(expected = ContentNotFoundException.class)
public void testReadWithNoContent() throws IOException {
final ContentClaim claim = repository.create(true);
final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L);
final InputStream in = repository.read(claim);
in.close();
}
@ -421,6 +442,132 @@ public class TestFileSystemRepository {
assertTrue(repository.remove(claim));
}
@Test
public void testMarkDestructableDoesNotArchiveIfStreamOpenAndWrittenTo() throws IOException, InterruptedException {
FileSystemRepository repository = null;
try {
final List<Path> archivedPaths = Collections.synchronizedList(new ArrayList<Path>());
// We are creating our own 'local' repository in this test so shut down the one created in the setup() method
shutdown();
repository = new FileSystemRepository() {
@Override
protected boolean archive(Path curPath) throws IOException {
archivedPaths.add(curPath);
return true;
}
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.purge();
final ContentClaim claim = repository.create(false);
// Create a stream and write a bit to it, then close it. This will cause the
// claim to be put back onto the 'writableClaimsQueue'
try (final OutputStream out = repository.write(claim)) {
assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
out.write("1\n".getBytes());
}
assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim());
assertEquals(0, claimantCount);
assertTrue(archivedPaths.isEmpty());
claimManager.markDestructable(claim.getResourceClaim());
// Wait for the archive thread to have a chance to run
Thread.sleep(2000L);
// Should still be empty because we have a stream open to the file.
assertTrue(archivedPaths.isEmpty());
assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim()));
} finally {
if (repository != null) {
repository.shutdown();
}
}
}
/**
* We have encountered a situation where the File System Repo is moving files to archive and then eventually
* aging them off while there is still an open file handle. This test is meant to replicate the conditions under
* which this would happen and verify that it is fixed.
*
* The condition that caused this appears to be that a Process Session created a Content Claim and then did not write
* to it. It then decremented the claimant count (which reduced the count to 0). This was likely due to creating the
* claim in ProcessSession.write(FlowFile, StreamCallback) and then having an Exception thrown when the Process Session
* attempts to read the current Content Claim. In this case, it would not ever get to the point of calling
* FileSystemRepository.write().
*
* The above sequence of events is problematic because calling FileSystemRepository.create() will remove the Resource Claim
* from the 'writable claims queue' and expects that we will write to it. When we call FileSystemRepository.write() with that
* Resource Claim, we return an OutputStream that, when closed, will take care of adding the Resource Claim back to the
* 'writable claims queue' or otherwise close the FileOutputStream that is open for that Resource Claim. If FileSystemRepository.write()
* is never called, or if the OutputStream returned by that method is never closed, but the Content Claim is then decremented to 0,
* we can get into a situation where we do archive the content (because the claimant count is 0 and it is not in the 'writable claims queue')
* and then eventually age it off, without ever closing the OutputStream. We need to ensure that we do always close that Output Stream.
*/
@Test
public void testMarkDestructableDoesNotArchiveIfStreamOpenAndNotWrittenTo() throws IOException, InterruptedException {
FileSystemRepository repository = null;
try {
final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>());
// We are creating our own 'local' repository in this test so shut down the one created in the setup() method
shutdown();
repository = new FileSystemRepository() {
@Override
protected boolean archive(Path curPath) throws IOException {
if (getOpenStreamCount() > 0) {
archivedPathsWithOpenStream.add(curPath);
}
return true;
}
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.purge();
final ContentClaim claim = repository.create(false);
assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim());
assertEquals(0, claimantCount);
assertTrue(archivedPathsWithOpenStream.isEmpty());
// This would happen when FlowFile repo is checkpointed, if Resource Claim has claimant count of 0.
// Since the Resource Claim of interest is still 'writable', we should not archive it.
claimManager.markDestructable(claim.getResourceClaim());
// Wait for the archive thread to have a chance to run
long totalSleepMillis = 0;
final long startTime = System.nanoTime();
while (archivedPathsWithOpenStream.isEmpty() && totalSleepMillis < 5000) {
Thread.sleep(100L);
totalSleepMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
// Should still be empty because we have a stream open to the file so we should
// not actually try to archive the data.
assertTrue(archivedPathsWithOpenStream.isEmpty());
assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim()));
} finally {
if (repository != null) {
repository.shutdown();
}
}
}
@Test
public void testMergeWithHeaderFooterDemarcator() throws IOException {
testMerge("HEADER", "FOOTER", "DEMARCATOR");

View File

@ -49,6 +49,8 @@ nifi.swap.out.threads=4
nifi.content.claim.max.appendable.size=10 MB
nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/content_repository
nifi.content.repository.archive.enabled=true
nifi.content.repository.archive.max.usage.percentage=95%
# Provenance Repository Properties
nifi.provenance.repository.storage.directory=./target/provenance_repository