NIFI-6924: When seeking to the appropriate offset for a content claim, ensure that if there are not enough bytes in the underlying resource claim that a ContentNotFoundException is thrown. Also cleaned up error-handling case in StandardProcessSession to ensure that we close the existing InputStream before calling handleContenttNotFoundException, since this method may itself throw an Exception

This closes #3924.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-12-10 11:20:01 -05:00 committed by Bryan Bende
parent ba6d050ba8
commit 452ca98c29
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 92 additions and 25 deletions

View File

@ -16,7 +16,27 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@ -57,24 +77,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Is thread safe
@ -882,11 +884,20 @@ public class FileSystemRepository implements ContentRepository {
if (claim.getOffset() > 0L) {
try {
StreamUtils.skip(fis, claim.getOffset());
} catch (IOException ioe) {
} catch (final EOFException eof) {
final long resourceClaimBytes;
try {
resourceClaimBytes = Files.size(path);
} catch (final IOException e) {
throw new ContentNotFoundException(claim, "Content Claim has an offset of " + claim.getOffset()
+ " but Resource Claim has fewer than this many bytes (actual length of the resource claim could not be determined)");
}
throw new ContentNotFoundException(claim, "Content Claim has an offset of " + claim.getOffset() + " but Resource Claim " + path + " is only " + resourceClaimBytes + " bytes");
} catch (final IOException ioe) {
IOUtils.closeQuietly(fis);
throw ioe;
}
}
// A claim length of -1 indicates that the claim is still being written to and we don't know

View File

@ -56,6 +56,7 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.rocksdb.Checkpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -2361,7 +2362,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
}
final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
final InputStream rawIn;
try {
rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
} catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record);
throw nfe;
}
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@ -2375,13 +2383,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
return ffais.read();
} catch (final ContentNotFoundException cnfe) {
handleContentNotFound(cnfe, record);
close();
handleContentNotFound(cnfe, record);
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
rollback(true);
throw ffae;
}
}
@ -2396,13 +2404,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
return ffais.read(b, off, len);
} catch (final ContentNotFoundException cnfe) {
handleContentNotFound(cnfe, record);
close();
handleContentNotFound(cnfe, record);
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
rollback(true);
throw ffae;
}
}

View File

@ -21,6 +21,7 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
@ -29,6 +30,7 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -37,10 +39,12 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@ -164,6 +168,50 @@ public class TestFileSystemRepository {
assertTrue(messageFound);
}
@Test
public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {
final File contentFile = new File("target/content_repository/0/0.bin");
try (final OutputStream fos = new FileOutputStream(contentFile)) {
fos.write("Hello World".getBytes(StandardCharsets.UTF_8));
}
final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "default", "0", "0.bin", false);
final StandardContentClaim existingContentClaim = new StandardContentClaim(resourceClaim, 0);
existingContentClaim.setLength(11);
try (final InputStream in = repository.read(existingContentClaim)) {
final byte[] buff = new byte[11];
StreamUtils.fillBuffer(in, buff);
assertEquals("Hello World", new String(buff, StandardCharsets.UTF_8));
}
final StandardContentClaim halfContentClaim = new StandardContentClaim(resourceClaim, 6);
halfContentClaim.setLength(5);
try (final InputStream in = repository.read(halfContentClaim)) {
final byte[] buff = new byte[5];
StreamUtils.fillBuffer(in, buff);
assertEquals("World", new String(buff, StandardCharsets.UTF_8));
}
final StandardContentClaim emptyContentClaim = new StandardContentClaim(resourceClaim, 11);
existingContentClaim.setLength(0);
try (final InputStream in = repository.read(emptyContentClaim)) {
assertEquals(-1, in.read());
}
final StandardContentClaim missingContentClaim = new StandardContentClaim(resourceClaim, 12);
missingContentClaim.setLength(1);
try {
repository.read(missingContentClaim);
Assert.fail("Did not throw ContentNotFoundException");
} catch (final ContentNotFoundException cnfe) {
// Expected
}
}
@Test
public void testBogusFile() throws IOException {
repository.shutdown();