NIFI-5879: Fixed bug in FileSystemRepository that can occur if an InputStream is obtained, then more data is written to the Content Claim - the InputStream would end before allowing the sequential data to be read. Also fixed bugs in LimitedInputStream related to available(), mark(), and reset() and the corresponding unit tests. Additionally, found that one call to StandardProcessSession.read() was not properly flushing the output of any Content Claim that has been written to before attempting to read it.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3207
This commit is contained in:
Mark Payne 2018-12-06 16:22:29 -05:00 committed by Matthew Burgess
parent b59fa5af1f
commit cf41c10546
5 changed files with 96 additions and 43 deletions

View File

@ -864,9 +864,16 @@ public class FileSystemRepository implements ContentRepository {
}
// see javadocs for claim.getLength() as to why we do this.
// A claim length of -1 indicates that the claim is still being written to and we don't know
// the length. In this case, we don't limit the Input Stream. If the Length has been populated, though,
// it is possible that the Length could then be extended. However, we do want to avoid ever allowing the
// stream to read past the end of the Content Claim. To accomplish this, we use a LimitedInputStream but
// provide a LongSupplier for the length instead of a Long value. this allows us to continue reading until
// we get to the end of the Claim, even if the Claim grows. This may happen, for instance, if we obtain an
// InputStream for this claim, then read from it, write more to the claim, and then attempt to read again. In
// such a case, since we have written to that same Claim, we should still be able to read those bytes.
if (claim.getLength() >= 0) {
return new LimitedInputStream(fis, claim.getLength());
return new LimitedInputStream(fis, claim::getLength);
} else {
return fis;
}

View File

@ -2267,7 +2267,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardRepositoryRecord record = getRecord(source);
try {
ensureNotAppending(record.getCurrentClaim());
final ContentClaim currentClaim = record.getCurrentClaim();
ensureNotAppending(currentClaim);
claimCache.flush(currentClaim);
} catch (final IOException e) {
throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
}

View File

@ -18,21 +18,36 @@ package org.apache.nifi.controller.repository.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.function.LongSupplier;
public class LimitedInputStream extends InputStream {
private final InputStream in;
private long limit;
private final long limit;
private final LongSupplier limitSupplier;
private long bytesRead = 0;
private long markOffset = -1L;
public LimitedInputStream(final InputStream in, final LongSupplier limitSupplier) {
this.in = in;
this.limitSupplier = Objects.requireNonNull(limitSupplier);
this.limit = -1;
}
public LimitedInputStream(final InputStream in, final long limit) {
this.in = in;
this.limit = limit;
this.limitSupplier = null;
}
private long getLimit() {
return limitSupplier == null ? limit : limitSupplier.getAsLong();
}
@Override
public int read() throws IOException {
if (bytesRead >= limit) {
if (bytesRead >= getLimit()) {
return -1;
}
@ -45,6 +60,7 @@ public class LimitedInputStream extends InputStream {
@Override
public int read(final byte[] b) throws IOException {
final long limit = getLimit();
if (bytesRead >= limit) {
return -1;
}
@ -60,6 +76,7 @@ public class LimitedInputStream extends InputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
final long limit = getLimit();
if (bytesRead >= limit) {
return -1;
}
@ -75,14 +92,14 @@ public class LimitedInputStream extends InputStream {
@Override
public long skip(final long n) throws IOException {
final long skipped = in.skip(Math.min(n, limit - bytesRead));
final long skipped = in.skip(Math.min(n, getLimit() - bytesRead));
bytesRead += skipped;
return skipped;
}
@Override
public int available() throws IOException {
return (int)(limit - bytesRead);
return (int)(getLimit() - bytesRead);
}
@Override
@ -93,8 +110,7 @@ public class LimitedInputStream extends InputStream {
@Override
public void mark(int readlimit) {
in.mark(readlimit);
limit -= bytesRead;
bytesRead = 0;
markOffset = bytesRead;
}
@Override
@ -105,6 +121,10 @@ public class LimitedInputStream extends InputStream {
@Override
public void reset() throws IOException {
in.reset();
bytesRead = 0;
if (markOffset >= 0) {
bytesRead = markOffset;
}
markOffset = -1;
}
}

View File

@ -16,12 +16,23 @@
*/
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -45,24 +56,12 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
public class TestFileSystemRepository {
@ -191,6 +190,28 @@ public class TestFileSystemRepository {
assertEquals(1, repository.getClaimantCount(claim));
}
@Test
public void testReadClaimThenWriteThenReadMore() throws IOException {
final ContentClaim claim = repository.create(false);
final OutputStream out = repository.write(claim);
out.write("hello".getBytes());
out.flush();
final InputStream in = repository.read(claim);
final byte[] buffer = new byte[5];
StreamUtils.fillBuffer(in, buffer);
assertEquals("hello", new String(buffer));
out.write("good-bye".getBytes());
out.close();
final byte[] buffer2 = new byte[8];
StreamUtils.fillBuffer(in, buffer2);
assertEquals("good-bye", new String(buffer2));
}
@Test
public void testClaimantCounts() throws IOException {
final ContentClaim claim = repository.create(true);

View File

@ -16,14 +16,14 @@
*/
package org.apache.nifi.controller.repository.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestLimitedInputStream {
@ -70,9 +70,11 @@ public class TestLimitedInputStream {
@Test
public void testSkip() throws Exception {
final LimitedInputStream lis = new LimitedInputStream(bais, 4);
lis.mark(4);
assertEquals(3, lis.read(buffer3));
assertEquals(1, lis.skip(data.length));
lis.reset();
lis.mark(4);
assertEquals(4, lis.skip(7));
lis.reset();
assertEquals(2, lis.skip(2));
@ -91,7 +93,7 @@ public class TestLimitedInputStream {
@Test
public void testAvailable() throws Exception {
final LimitedInputStream lis = new LimitedInputStream(bais, 4);
assertNotEquals(data.length, lis.available());
assertEquals(4, lis.available());
lis.reset();
assertEquals(4, lis.available());
assertEquals(1, lis.read(buffer3, 0, 1));
@ -107,14 +109,15 @@ public class TestLimitedInputStream {
@Test
public void testMark() throws Exception {
final LimitedInputStream lis = new LimitedInputStream(bais, 6);
assertEquals(3, lis.read(buffer3));
assertEquals(3, lis.read(buffer10));
lis.reset();
assertEquals(3, lis.read(buffer3));
lis.mark(1000);
assertEquals(3, lis.read(buffer3));
assertEquals(3, lis.read(buffer10));
lis.reset();
lis.mark(1000);
assertEquals(3, lis.read(buffer3));
assertEquals(3, lis.read(buffer10));
lis.reset();
assertEquals(6, lis.read(buffer10));
}
}