NIFI-11584 Improved MergeContent stream handling

- Allow ProcessSession to manage its own input streams and deprecated method that reads from FlowFile without allowing it

This closes #7286

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-05-23 15:59:20 -04:00 committed by exceptionfactory
parent 1642315c0f
commit ca2a829d47
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 13 additions and 14 deletions

View File

@ -735,7 +735,11 @@ public interface ProcessSession {
* FlowFile content; if an attempt is made to access the InputStream
* provided to the given InputStreamCallback after this method completed its
* execution
*
* @deprecated Restricting the ProcessSession's ability to manage its own streams should not be used. The need for this
* capability was obviated by the introduction of the {@link #migrate(ProcessSession, Collection)} and {@link #migrate(ProcessSession)} methods.
*/
@Deprecated
void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
/**

View File

@ -453,7 +453,7 @@ public class MergeContent extends BinFiles {
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
@ -462,7 +462,7 @@ public class MergeContent extends BinFiles {
@Override
protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} else {
binManager.setFileCountAttribute(null);
@ -505,7 +505,7 @@ public class MergeContent extends BinFiles {
final List<FlowFile> contents = bin.getContents();
final ProcessSession binSession = bin.getSession();
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
final String error = getDefragmentValidationError(bin.getContents());
// Fail the flow files and commit them
@ -648,12 +648,7 @@ public class MergeContent extends BinFiles {
final Iterator<FlowFile> itr = contents.iterator();
while (itr.hasNext()) {
final FlowFile flowFile = itr.next();
bin.getSession().read(flowFile, false, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, out);
}
});
bin.getSession().read(flowFile, in -> StreamUtils.copy(in, out));
if (itr.hasNext()) {
if (demarcator != null) {
@ -694,7 +689,7 @@ public class MergeContent extends BinFiles {
private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFile> wrappers, final PropertyDescriptor descriptor) throws IOException {
final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue();
if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) {
if (DELIMITER_STRATEGY_FILENAME.getValue().equals(delimiterStrategyValue)) {
return getDelimiterFileContent(context, wrappers, descriptor);
} else {
return getDelimiterTextContent(context, wrappers, descriptor);
@ -881,7 +876,7 @@ public class MergeContent extends BinFiles {
final OutputStream out = new NonCloseableOutputStream(bufferedOut);
for (final FlowFile flowFile : contents) {
bin.getSession().read(flowFile, false, new InputStreamCallback() {
bin.getSession().read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
@ -919,7 +914,7 @@ public class MergeContent extends BinFiles {
private final int compressionLevel;
private List<FlowFile> unmerged = new ArrayList<>();
private final List<FlowFile> unmerged = new ArrayList<>();
public ZipMerge(final int compressionLevel) {
this.compressionLevel = compressionLevel;
@ -986,7 +981,7 @@ public class MergeContent extends BinFiles {
private class AvroMerge implements MergeBin {
private List<FlowFile> unmerged = new ArrayList<>();
private final List<FlowFile> unmerged = new ArrayList<>();
@Override
public FlowFile merge(final Bin bin, final ProcessContext context) {
@ -1007,7 +1002,7 @@ public class MergeContent extends BinFiles {
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
for (final FlowFile flowFile : contents) {
bin.getSession().read(flowFile, false, new InputStreamCallback() {
bin.getSession().read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
boolean canMerge = true;