mirror of https://github.com/apache/nifi.git
NIFI-939: Fixed bug in equals and hashCode methods of StandardContentClaim; added unit test to TestStandardProcessSession in order to verify behavior of append() method
This commit is contained in:
parent
291e65c6ef
commit
3780159d01
|
@ -46,7 +46,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
|
|||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result;
|
||||
result = prime * result + (int) (length ^ length >>> 32);
|
||||
result = prime * result + (int) (offset ^ offset >>> 32);
|
||||
result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
|
||||
return result;
|
||||
|
@ -67,10 +66,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
|
|||
}
|
||||
|
||||
final ContentClaim other = (ContentClaim) obj;
|
||||
if (length != other.getLength()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (offset != other.getOffset()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -102,4 +97,9 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
|
|||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StandardContentClaim [resourceClaim=" + resourceClaim + ", offset=" + offset + ", length=" + length + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -69,6 +70,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
|
|||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRepository;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -679,6 +681,45 @@ public class TestStandardProcessSession {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws IOException {
|
||||
FlowFile ff = session.create();
|
||||
ff = session.append(ff, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write("Hello".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
// do not allow the content repo to be read from; this ensures that we are
|
||||
// not copying the data each time we call append but instead are actually appending to the output stream
|
||||
contentRepo.disableRead = true;
|
||||
ff = session.append(ff, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write(", ".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
ff = session.append(ff, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write("World".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
contentRepo.disableRead = false;
|
||||
final byte[] buff = new byte["Hello, World".getBytes().length];
|
||||
session.read(ff, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
StreamUtils.fillBuffer(in, buff);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals("Hello, World", new String(buff));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
|
@ -1076,6 +1117,7 @@ public class TestStandardProcessSession {
|
|||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
private final AtomicLong claimsRemoved = new AtomicLong(0L);
|
||||
private ResourceClaimManager claimManager;
|
||||
private boolean disableRead = false;
|
||||
|
||||
private final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -1193,13 +1235,17 @@ public class TestStandardProcessSession {
|
|||
@Override
|
||||
public long importFrom(Path content, ContentClaim claim) throws IOException {
|
||||
Files.copy(content, getPath(claim));
|
||||
return Files.size(content);
|
||||
final long size = Files.size(content);
|
||||
((StandardContentClaim) claim).setLength(size);
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long importFrom(InputStream content, ContentClaim claim) throws IOException {
|
||||
Files.copy(content, getPath(claim));
|
||||
return Files.size(getPath(claim));
|
||||
final long size = Files.size(getPath(claim));
|
||||
((StandardContentClaim) claim).setLength(size);
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1229,6 +1275,10 @@ public class TestStandardProcessSession {
|
|||
|
||||
@Override
|
||||
public InputStream read(ContentClaim claim) throws IOException {
|
||||
if (disableRead) {
|
||||
throw new IOException("Reading from repo is disabled by unit test");
|
||||
}
|
||||
|
||||
if (claim == null) {
|
||||
return new ByteArrayInputStream(new byte[0]);
|
||||
}
|
||||
|
@ -1241,7 +1291,7 @@ public class TestStandardProcessSession {
|
|||
}
|
||||
|
||||
@Override
|
||||
public OutputStream write(ContentClaim claim) throws IOException {
|
||||
public OutputStream write(final ContentClaim claim) throws IOException {
|
||||
final Path path = getPath(claim);
|
||||
final File file = path.toFile();
|
||||
final File parentFile = file.getParentFile();
|
||||
|
@ -1249,7 +1299,27 @@ public class TestStandardProcessSession {
|
|||
if (!parentFile.exists() && !parentFile.mkdirs()) {
|
||||
throw new IOException("Unable to create directory " + parentFile.getAbsolutePath());
|
||||
}
|
||||
return new FileOutputStream(file);
|
||||
|
||||
final OutputStream fos = new FileOutputStream(file);
|
||||
return new FilterOutputStream(fos) {
|
||||
@Override
|
||||
public void write(final int b) throws IOException {
|
||||
fos.write(b);
|
||||
((StandardContentClaim) claim).setLength(claim.getLength() + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
fos.write(b, off, len);
|
||||
((StandardContentClaim) claim).setLength(claim.getLength() + len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
fos.write(b);
|
||||
((StandardContentClaim) claim).setLength(claim.getLength() + b.length);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue