diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index d3de916572..7b855f2833 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -484,6 +484,9 @@ public interface ProcessSession { /** * Executes the given callback against the contents corresponding to the * given FlowFile. + * + * Note: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. * * @param source * @param reader @@ -498,9 +501,11 @@ public interface ProcessSession { * destroyed, and the session is automatically rolled back and what is left * of the FlowFile is destroyed. * @throws FlowFileAccessException if some IO problem occurs accessing - * FlowFile content + * FlowFile content; if an attempt is made to access the InputStream + * provided to the given InputStreamCallback after this method completed its + * execution */ - void read(FlowFile source, InputStreamCallback reader); + void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException; /** * Combines the content of all given source FlowFiles into a single given @@ -560,7 +565,10 @@ public interface ProcessSession { /** * Executes the given callback against the content corresponding to the - * given FlowFile + * given FlowFile. + * + * Note: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. * * @param source * @param writer @@ -576,13 +584,19 @@ public interface ProcessSession { * destroyed, and the session is automatically rolled back and what is left * of the FlowFile is destroyed. * @throws FlowFileAccessException if some IO problem occurs accessing - * FlowFile content + * FlowFile content; if an attempt is made to access the OutputStream + * provided to the given OutputStreamCallaback after this method completed + * its execution */ - FlowFile write(FlowFile source, OutputStreamCallback writer); + FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; /** * Executes the given callback against the content corresponding to the - * given flow file + * given flow file. + * + * Note: The InputStream & OutputStream provided to the given + * StreamCallback will not be accessible once this method has completed its + * execution. * * @param source * @param writer @@ -598,20 +612,28 @@ public interface ProcessSession { * destroyed, and the session is automatically rolled back and what is left * of the FlowFile is destroyed. * @throws FlowFileAccessException if some IO problem occurs accessing - * FlowFile content + * FlowFile content; if an attempt is made to access the InputStream or + * OutputStream provided to the given StreamCallback after this method + * completed its execution */ - FlowFile write(FlowFile source, StreamCallback writer); + FlowFile write(FlowFile source, StreamCallback writer) throws FlowFileAccessException; /** * Executes the given callback against the content corresponding to the * given FlowFile, such that any data written to the OutputStream of the * content will be appended to the end of FlowFile. + * + * Note: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. * * @param source * @param writer * @return + * @throws FlowFileAccessException if an attempt is made to access the + * OutputStream provided to the given OutputStreamCallaback after this method + * completed its execution */ - FlowFile append(FlowFile source, OutputStreamCallback writer); + FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; /** * Writes to the given FlowFile all content from the given content path. diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 8d2e4567ef..e5cd03ea19 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -46,6 +46,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.io.ByteCountingInputStream; import org.apache.nifi.controller.repository.io.ByteCountingOutputStream; +import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; @@ -1735,7 +1736,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead)) { + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -2180,9 +2182,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); - final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { recursionSet.add(source); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java new file mode 100644 index 0000000000..ddcf6c9bf0 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.io; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Wraps an existing InputStream, so that when {@link InputStream#close()} is + * called, the underlying InputStream is NOT closed but this InputStream can no + * longer be written to + */ +public class DisableOnCloseInputStream extends InputStream { + + private final InputStream wrapped; + private boolean closed = false; + + public DisableOnCloseInputStream(final InputStream wrapped) { + this.wrapped = wrapped; + } + + @Override + public int read() throws IOException { + checkClosed(); + return wrapped.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkClosed(); + return wrapped.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkClosed(); + return wrapped.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + checkClosed(); + return wrapped.skip(n); + } + + @Override + public int available() throws IOException { + return wrapped.available(); + } + + private void checkClosed() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public void mark(int readlimit) { + if (closed == false) { + wrapped.mark(readlimit); + } + } + + @Override + public synchronized void reset() throws IOException { + checkClosed(); + wrapped.reset(); + } + + @Override + public boolean markSupported() { + return wrapped.markSupported(); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 1ff63c545b..ca68725bf1 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -56,6 +56,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.MissingFlowFileException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; @@ -65,6 +66,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.ObjectHolder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -235,6 +237,147 @@ public class TestStandardProcessSession { assertEquals(0, contentRepo.getExistingClaims().size()); } + private void assertDisabled(final OutputStream outputStream) { + try { + outputStream.write(new byte[0]); + Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[])"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + outputStream.write(0); + Assert.fail("Expected OutputStream to be disabled; was able to call write(int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + outputStream.write(new byte[0], 0, 0); + Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[], int, int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + } + + private void assertDisabled(final InputStream inputStream) { + try { + inputStream.read(); + Assert.fail("Expected InputStream to be disabled; was able to call read()"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.read(new byte[0]); + Assert.fail("Expected InputStream to be disabled; was able to call read(byte[])"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.read(new byte[0], 0, 0); + Assert.fail("Expected InputStream to be disabled; was able to call read(byte[], int, int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.reset(); + Assert.fail("Expected InputStream to be disabled; was able to call reset()"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.skip(1L); + Assert.fail("Expected InputStream to be disabled; was able to call skip(long)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + } + + @Test + public void testAppendAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream outputStream) throws IOException { + outputStreamHolder.set(outputStream); + } + }); + assertDisabled(outputStreamHolder.get()); + } + + @Test + public void testReadAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder inputStreamHolder = new ObjectHolder<>(null); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream inputStream) throws IOException { + inputStreamHolder.set(inputStream); + } + }); + assertDisabled(inputStreamHolder.get()); + } + + @Test + public void testStreamAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder inputStreamHolder = new ObjectHolder<>(null); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream input, final OutputStream output) throws IOException { + inputStreamHolder.set(input); + outputStreamHolder.set(output); + } + }); + assertDisabled(inputStreamHolder.get()); + assertDisabled(outputStreamHolder.get()); + } + + @Test + public void testWriteAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + outputStreamHolder.set(out); + } + }); + assertDisabled(outputStreamHolder.get()); + } + @Test public void testCreateThenRollbackRemovesContent() throws IOException { @@ -998,6 +1141,12 @@ public class TestStandardProcessSession { public ContentClaim create(boolean lossTolerant) throws IOException { final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); claimantCounts.put(claim, new AtomicInteger(1)); + final Path path = getPath(claim); + final Path parent = path.getParent(); + if (Files.exists(parent) == false) { + Files.createDirectories(parent); + } + Files.createFile(getPath(claim)); return claim; }