mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 02:28:31 +00:00
NIFI-11584 Improved MergeContent stream handling
- Allow ProcessSession to manage its own input streams and deprecated method that reads from FlowFile without allowing it This closes #7286 Signed-off-by: David Handermann <exceptionfactory@apache.org> (cherry picked from commit ca2a829d47a252544a764998d11318d2a2cd6e1c)
This commit is contained in:
parent
7ec61c22f1
commit
9007e6b2fc
@ -735,7 +735,11 @@ public interface ProcessSession {
|
|||||||
* FlowFile content; if an attempt is made to access the InputStream
|
* FlowFile content; if an attempt is made to access the InputStream
|
||||||
* provided to the given InputStreamCallback after this method completed its
|
* provided to the given InputStreamCallback after this method completed its
|
||||||
* execution
|
* execution
|
||||||
|
*
|
||||||
|
* @deprecated Restricting the ProcessSession's ability to manage its own streams should not be used. The need for this
|
||||||
|
* capability was obviated by the introduction of the {@link #migrate(ProcessSession, Collection)} and {@link #migrate(ProcessSession)} methods.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
|
void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -453,7 +453,7 @@ public class MergeContent extends BinFiles {
|
|||||||
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
|
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
|
||||||
|
|
||||||
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
|
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
|
||||||
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
||||||
groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
|
groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -462,7 +462,7 @@ public class MergeContent extends BinFiles {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
|
protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
|
||||||
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
||||||
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
|
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
|
||||||
} else {
|
} else {
|
||||||
binManager.setFileCountAttribute(null);
|
binManager.setFileCountAttribute(null);
|
||||||
@ -505,7 +505,7 @@ public class MergeContent extends BinFiles {
|
|||||||
final List<FlowFile> contents = bin.getContents();
|
final List<FlowFile> contents = bin.getContents();
|
||||||
final ProcessSession binSession = bin.getSession();
|
final ProcessSession binSession = bin.getSession();
|
||||||
|
|
||||||
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) {
|
||||||
final String error = getDefragmentValidationError(bin.getContents());
|
final String error = getDefragmentValidationError(bin.getContents());
|
||||||
|
|
||||||
// Fail the flow files and commit them
|
// Fail the flow files and commit them
|
||||||
@ -648,12 +648,7 @@ public class MergeContent extends BinFiles {
|
|||||||
final Iterator<FlowFile> itr = contents.iterator();
|
final Iterator<FlowFile> itr = contents.iterator();
|
||||||
while (itr.hasNext()) {
|
while (itr.hasNext()) {
|
||||||
final FlowFile flowFile = itr.next();
|
final FlowFile flowFile = itr.next();
|
||||||
bin.getSession().read(flowFile, false, new InputStreamCallback() {
|
bin.getSession().read(flowFile, in -> StreamUtils.copy(in, out));
|
||||||
@Override
|
|
||||||
public void process(final InputStream in) throws IOException {
|
|
||||||
StreamUtils.copy(in, out);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (itr.hasNext()) {
|
if (itr.hasNext()) {
|
||||||
if (demarcator != null) {
|
if (demarcator != null) {
|
||||||
@ -694,7 +689,7 @@ public class MergeContent extends BinFiles {
|
|||||||
|
|
||||||
private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFile> wrappers, final PropertyDescriptor descriptor) throws IOException {
|
private byte[] getDelimiterContent(final ProcessContext context, final List<FlowFile> wrappers, final PropertyDescriptor descriptor) throws IOException {
|
||||||
final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue();
|
final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue();
|
||||||
if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) {
|
if (DELIMITER_STRATEGY_FILENAME.getValue().equals(delimiterStrategyValue)) {
|
||||||
return getDelimiterFileContent(context, wrappers, descriptor);
|
return getDelimiterFileContent(context, wrappers, descriptor);
|
||||||
} else {
|
} else {
|
||||||
return getDelimiterTextContent(context, wrappers, descriptor);
|
return getDelimiterTextContent(context, wrappers, descriptor);
|
||||||
@ -881,7 +876,7 @@ public class MergeContent extends BinFiles {
|
|||||||
final OutputStream out = new NonCloseableOutputStream(bufferedOut);
|
final OutputStream out = new NonCloseableOutputStream(bufferedOut);
|
||||||
|
|
||||||
for (final FlowFile flowFile : contents) {
|
for (final FlowFile flowFile : contents) {
|
||||||
bin.getSession().read(flowFile, false, new InputStreamCallback() {
|
bin.getSession().read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
public void process(final InputStream rawIn) throws IOException {
|
||||||
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
||||||
@ -926,7 +921,7 @@ public class MergeContent extends BinFiles {
|
|||||||
|
|
||||||
private final int compressionLevel;
|
private final int compressionLevel;
|
||||||
|
|
||||||
private List<FlowFile> unmerged = new ArrayList<>();
|
private final List<FlowFile> unmerged = new ArrayList<>();
|
||||||
|
|
||||||
public ZipMerge(final int compressionLevel) {
|
public ZipMerge(final int compressionLevel) {
|
||||||
this.compressionLevel = compressionLevel;
|
this.compressionLevel = compressionLevel;
|
||||||
@ -993,7 +988,7 @@ public class MergeContent extends BinFiles {
|
|||||||
|
|
||||||
private class AvroMerge implements MergeBin {
|
private class AvroMerge implements MergeBin {
|
||||||
|
|
||||||
private List<FlowFile> unmerged = new ArrayList<>();
|
private final List<FlowFile> unmerged = new ArrayList<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FlowFile merge(final Bin bin, final ProcessContext context) {
|
public FlowFile merge(final Bin bin, final ProcessContext context) {
|
||||||
@ -1014,7 +1009,7 @@ public class MergeContent extends BinFiles {
|
|||||||
public void process(final OutputStream rawOut) throws IOException {
|
public void process(final OutputStream rawOut) throws IOException {
|
||||||
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
|
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
|
||||||
for (final FlowFile flowFile : contents) {
|
for (final FlowFile flowFile : contents) {
|
||||||
bin.getSession().read(flowFile, false, new InputStreamCallback() {
|
bin.getSession().read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(InputStream in) throws IOException {
|
public void process(InputStream in) throws IOException {
|
||||||
boolean canMerge = true;
|
boolean canMerge = true;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user