Merge branch 'inputstream-callback-protection' of https://github.com/rowolabi/incubator-nifi into develop

This commit is contained in:
Mark Payne 2015-03-20 09:11:39 -04:00
commit e88ed13d8d
4 changed files with 280 additions and 13 deletions

View File

@ -484,6 +484,9 @@ public interface ProcessSession {
/**
* Executes the given callback against the contents corresponding to the
* given FlowFile.
*
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param source
* @param reader
@ -498,9 +501,11 @@ public interface ProcessSession {
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content
* FlowFile content; if an attempt is made to access the InputStream
* provided to the given InputStreamCallback after this method completed its
* execution
*/
void read(FlowFile source, InputStreamCallback reader);
void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException;
/**
* Combines the content of all given source FlowFiles into a single given
@ -560,7 +565,10 @@ public interface ProcessSession {
/**
* Executes the given callback against the content corresponding to the
* given FlowFile
* given FlowFile.
*
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param source
* @param writer
@ -576,13 +584,19 @@ public interface ProcessSession {
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content
* FlowFile content; if an attempt is made to access the OutputStream
* provided to the given OutputStreamCallaback after this method completed
* its execution
*/
FlowFile write(FlowFile source, OutputStreamCallback writer);
FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;
/**
* Executes the given callback against the content corresponding to the
* given flow file
* given flow file.
*
* <i>Note</i>: The InputStream & OutputStream provided to the given
* StreamCallback will not be accessible once this method has completed its
* execution.
*
* @param source
* @param writer
@ -598,20 +612,28 @@ public interface ProcessSession {
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content
* FlowFile content; if an attempt is made to access the InputStream or
* OutputStream provided to the given StreamCallback after this method
* completed its execution
*/
FlowFile write(FlowFile source, StreamCallback writer);
FlowFile write(FlowFile source, StreamCallback writer) throws FlowFileAccessException;
/**
* Executes the given callback against the content corresponding to the
* given FlowFile, such that any data written to the OutputStream of the
* content will be appended to the end of FlowFile.
*
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param source
* @param writer
* @return
* @throws FlowFileAccessException if an attempt is made to access the
* OutputStream provided to the given OutputStreamCallaback after this method
* completed its execution
*/
FlowFile append(FlowFile source, OutputStreamCallback writer);
FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;
/**
* Writes to the given FlowFile all content from the given content path.

View File

@ -46,6 +46,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
@ -1735,7 +1736,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead)) {
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -2180,9 +2182,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
recursionSet.add(source);

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.io;
import java.io.IOException;
import java.io.InputStream;
/**
* Wraps an existing InputStream, so that when {@link InputStream#close()} is
* called, the underlying InputStream is NOT closed but this InputStream can no
* longer be written to
*/
public class DisableOnCloseInputStream extends InputStream {
private final InputStream wrapped;
private boolean closed = false;
public DisableOnCloseInputStream(final InputStream wrapped) {
this.wrapped = wrapped;
}
@Override
public int read() throws IOException {
checkClosed();
return wrapped.read();
}
@Override
public int read(byte[] b) throws IOException {
checkClosed();
return wrapped.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkClosed();
return wrapped.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
checkClosed();
return wrapped.skip(n);
}
@Override
public int available() throws IOException {
return wrapped.available();
}
private void checkClosed() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public void mark(int readlimit) {
if (closed == false) {
wrapped.mark(readlimit);
}
}
@Override
public synchronized void reset() throws IOException {
checkClosed();
wrapped.reset();
}
@Override
public boolean markSupported() {
return wrapped.markSupported();
}
}

View File

@ -56,6 +56,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -65,6 +66,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.ObjectHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -235,6 +237,147 @@ public class TestStandardProcessSession {
assertEquals(0, contentRepo.getExistingClaims().size());
}
private void assertDisabled(final OutputStream outputStream) {
try {
outputStream.write(new byte[0]);
Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[])");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
outputStream.write(0);
Assert.fail("Expected OutputStream to be disabled; was able to call write(int)");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
outputStream.write(new byte[0], 0, 0);
Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[], int, int)");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
}
private void assertDisabled(final InputStream inputStream) {
try {
inputStream.read();
Assert.fail("Expected InputStream to be disabled; was able to call read()");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
inputStream.read(new byte[0]);
Assert.fail("Expected InputStream to be disabled; was able to call read(byte[])");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
inputStream.read(new byte[0], 0, 0);
Assert.fail("Expected InputStream to be disabled; was able to call read(byte[], int, int)");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
inputStream.reset();
Assert.fail("Expected InputStream to be disabled; was able to call reset()");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
try {
inputStream.skip(1L);
Assert.fail("Expected InputStream to be disabled; was able to call skip(long)");
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
}
@Test
public void testAppendAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream outputStream) throws IOException {
outputStreamHolder.set(outputStream);
}
});
assertDisabled(outputStreamHolder.get());
}
@Test
public void testReadAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream inputStream) throws IOException {
inputStreamHolder.set(inputStream);
}
});
assertDisabled(inputStreamHolder.get());
}
@Test
public void testStreamAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream input, final OutputStream output) throws IOException {
inputStreamHolder.set(input);
outputStreamHolder.set(output);
}
});
assertDisabled(inputStreamHolder.get());
assertDisabled(outputStreamHolder.get());
}
@Test
public void testWriteAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
outputStreamHolder.set(out);
}
});
assertDisabled(outputStreamHolder.get());
}
@Test
public void testCreateThenRollbackRemovesContent() throws IOException {
@ -998,6 +1141,12 @@ public class TestStandardProcessSession {
public ContentClaim create(boolean lossTolerant) throws IOException {
final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
claimantCounts.put(claim, new AtomicInteger(1));
final Path path = getPath(claim);
final Path parent = path.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
Files.createFile(getPath(claim));
return claim;
}