From 26fcf8158a3e10c490db11340e95e1c7943eae06 Mon Sep 17 00:00:00 2001 From: Tamas Palfy Date: Thu, 9 Jan 2020 13:47:33 +0100 Subject: [PATCH] NIFI-6992 - Add "Batch Size" property to GetHDFSFileInfo processor - Added "Batch Size property", takes effect only if "Destination" is set to "Content" and Grouping" is set to "None" NIFI-6992 - Add "Batch Size" property to GetHDFSFileInfo processor - Added validation for 'Batch Size' in 'GetHDFSFileInfo'. NIFI-6992 - Changed 'GetHDFSFileInfo.BATCH_SIZE' validator from 'NON_NEGATIVE_INTEGER_VALIDATOR' to 'POSITIVE_INTEGER_VALIDATOR'. Added more tests. NIFI-6992 - Removed 'AllowEmptyValidator'. 'Batch Size' in 'GetHDFSFileInfo' allows null but not empty String. NIFI-6992 - 'Batch Size' in 'GetHDFSFileInfo' allows null but not empty String - cont. NIFI-6992 - Fix: Unused import. This closes #3966. Signed-off-by: Peter Turcsanyi --- .../processors/hadoop/GetHDFSFileInfo.java | 161 ++++++++++++++--- .../hadoop/TestGetHDFSFileInfo.java | 169 ++++++++++++++++++ 2 files changed, 304 insertions(+), 26 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java index 3fe249d251..4d815d9c4d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -45,6 +46,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -178,6 +181,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { .defaultValue(GROUP_ALL.getValue()) .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .displayName("Batch Size") + .name("gethdfsfileinfo-batch-size") + .description("Number of records to put into an output flowfile when 'Destination' is set to 'Content'" + + " and 'Group Results' is set to 'None'") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes", "Details of given HDFS object will be stored in attributes of flowfile. " + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. " @@ -234,6 +246,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { props.add(IGNORE_DOTTED_DIRS); props.add(IGNORE_DOTTED_FILES); props.add(GROUPING); + props.add(BATCH_SIZE); props.add(DESTINATION); return props; } @@ -255,6 +268,30 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { req = null; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List validationResults = new ArrayList<>(super.customValidate(validationContext)); + + String destination = validationContext.getProperty(DESTINATION).getValue(); + String grouping = validationContext.getProperty(GROUPING).getValue(); + String batchSize = validationContext.getProperty(BATCH_SIZE).getValue(); + + if ( + (!DESTINATION_CONTENT.getValue().equals(destination) || !GROUP_NONE.getValue().equals(grouping)) + && batchSize != null + ) { + validationResults.add(new ValidationResult.Builder() + .valid(false) + .subject(BATCH_SIZE.getDisplayName()) + .explanation("'" + BATCH_SIZE.getDisplayName() + "' is applicable only when " + + "'" + DESTINATION.getDisplayName() + "'='" + DESTINATION_CONTENT.getDisplayName() + "' and " + + "'" + GROUPING.getDisplayName() + "'='" + GROUP_NONE.getDisplayName() + "'") + .build()); + } + + return validationResults; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile ff = null; @@ -286,7 +323,9 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { try { final FileSystem hdfs = getFileSystem(); UserGroupInformation ugi = getUserGroupInformation(); - HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, hdfs, ugi, req, null, false); + ExecutionContext executionContext = new ExecutionContext(); + HDFSObjectInfoDetails res = walkHDFSTree(context, session, executionContext, ff, hdfs, ugi, req, null, false); + executionContext.finish(session); if (res == null) { ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath); session.transfer(ff, REL_NOT_FOUND); @@ -320,8 +359,10 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { /* * Walks thru HDFS tree. This method will return null to the main if there is no provided path existing. */ - protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, FlowFile origFF, final FileSystem hdfs, - final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, final boolean statsOnly) throws Exception{ + protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, ExecutionContext executionContext, + FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, + final boolean statsOnly + ) throws Exception{ final HDFSObjectInfoDetails p = parent; @@ -334,7 +375,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { } if (parent.isFile() && p == null) { //single file path requested and found, lets send to output: - processHDFSObject(context, session, origFF, req, parent, true); + processHDFSObject(context, session, executionContext, origFF, req, parent, true); return parent; } @@ -345,7 +386,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { listFSt = ugi.doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path)); }catch (IOException e) { parent.error = "Couldn't list directory: " + e; - processHDFSObject(context, session, origFF, req, parent, p == null); + processHDFSObject(context, session, executionContext, origFF, req, parent, p == null); return parent; //File not found exception, or access denied - don't interrupt, just don't list } if (listFSt != null) { @@ -353,7 +394,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f); HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req); if (o.isDirectory() && !o.isSymlink() && req.isRecursive) { - o = walkHDFSTree(context, session, origFF, hdfs, ugi, req, o, vo == null || statsOnly); + o = walkHDFSTree(context, session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly); parent.countDirs += o.countDirs; parent.totalLen += o.totalLen; parent.countFiles += o.countFiles; @@ -370,12 +411,12 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { if (vo != null && !statsOnly) { parent.addChild(vo); if (vo.isFile() && !vo.isSymlink()) { - processHDFSObject(context, session, origFF, req, vo, false); + processHDFSObject(context, session, executionContext, origFF, req, vo, false); } } } if (!statsOnly) { - processHDFSObject(context, session, origFF, req, parent, p==null); + processHDFSObject(context, session, executionContext, origFF, req, parent, p==null); } if (req.groupping != Groupping.ALL) { parent.setChildren(null); //we need children in full tree only when single output requested. @@ -421,8 +462,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { * Checks whether HDFS object should be sent to output. * If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params. */ - protected HDFSObjectInfoDetails processHDFSObject(final ProcessContext context, final ProcessSession session, - FlowFile origFF, final HDFSFileInfoRequest req, final HDFSObjectInfoDetails o, final boolean isRoot) { + protected HDFSObjectInfoDetails processHDFSObject( + final ProcessContext context, + final ProcessSession session, + final ExecutionContext executionContext, + FlowFile origFF, + final HDFSFileInfoRequest req, + final HDFSObjectInfoDetails o, + final boolean isRoot + ) { if (o.isFile() && req.groupping != Groupping.NONE) { return null; //there is grouping by either root directory or every directory, no need to print separate files. } @@ -432,7 +480,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) { return null; } - FlowFile ff = session.create(origFF); + + FlowFile ff = getReadyFlowFile(executionContext, session, origFF); //if destination type is content - always add mime type if (req.isDestContent) { @@ -441,53 +490,91 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { //won't combine conditions for similar actions for better readability and maintenance. if (o.isFile() && isRoot && req.isDestContent) { - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isFile() && isRoot && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = addAsAttributes(session, o, ff); // ------------------------------ }else if (o.isFile() && req.isDestContent) { - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isFile() && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = addAsAttributes(session, o, ff); // ------------------------------ }else if (o.isDirectory() && o.isSymlink() && req.isDestContent) { - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = addAsAttributes(session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.NONE && req.isDestContent) { o.setChildren(null); - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.NONE && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = addAsAttributes(session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.DIR && req.isDestContent) { - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.DIR && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); - ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString()); + ff = addAsAttributes(session, o, ff); + ff = addFullTreeToAttribute(session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.ALL && req.isDestContent) { - ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + ff = addAsContent(executionContext, session, o, ff); // ------------------------------ }else if (o.isDirectory() && req.groupping == Groupping.ALL && !req.isDestContent) { - ff = session.putAllAttributes(ff, o.toAttributesMap()); - ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString()); + ff = addAsAttributes(session, o, ff); + ff = addFullTreeToAttribute(session, o, ff); }else { getLogger().error("Illegal State!"); session.remove(ff); return null; } - session.transfer(ff, REL_SUCCESS); + executionContext.flowfile = ff; + finishProcessing(executionContext, session); + return o; } + private FlowFile getReadyFlowFile(ExecutionContext executionContext, ProcessSession session, FlowFile origFF) { + if (executionContext.flowfile == null) { + executionContext.flowfile = session.create(origFF); + } + + return executionContext.flowfile; + } + + private void finishProcessing(ExecutionContext executionContext, ProcessSession session) { + executionContext.nrOfWaitingHDFSObjects++; + + if (req.groupping == Groupping.NONE && req.isDestContent && executionContext.nrOfWaitingHDFSObjects < req.batchSize) { + return; + } + + session.transfer(executionContext.flowfile, REL_SUCCESS); + + executionContext.reset(); + } + + private FlowFile addAsContent(ExecutionContext executionContext, ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) { + if (executionContext.nrOfWaitingHDFSObjects > 0) { + ff = session.append(ff, (out) -> out.write(("\n").getBytes())); + } + + return session.append(ff, (out) -> out.write((o.toJsonString()).getBytes())); + } + + private FlowFile addAsAttributes(ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) { + return session.putAllAttributes(ff, o.toAttributesMap()); + } + + private FlowFile addFullTreeToAttribute(ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) { + return session.putAttribute(ff, "hdfs.full.tree", o.toJsonString()); + } + /* * Returns permissions in readable format like rwxr-xr-x (755) */ @@ -548,6 +635,10 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { req.isIgnoreDotDirs = context.getProperty(IGNORE_DOTTED_DIRS).asBoolean(); req.groupping = HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue()); + req.batchSize = Optional.ofNullable(context.getProperty(BATCH_SIZE)) + .filter(propertyValue -> propertyValue.getValue() != null) + .map(PropertyValue::asInteger) + .orElse(1); v = context.getProperty(DESTINATION).getValue(); if (DESTINATION_CONTENT.getValue().equals(v)) { @@ -596,6 +687,23 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { return req; } + static class ExecutionContext { + int nrOfWaitingHDFSObjects; + FlowFile flowfile; + + void reset() { + nrOfWaitingHDFSObjects = 0; + flowfile = null; + } + + void finish(ProcessSession session) { + if (flowfile != null) { + session.transfer(flowfile, REL_SUCCESS); + flowfile = null; + } + } + } + /* * Keeps all request details in single object. */ @@ -634,6 +742,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor { boolean isIgnoreDotDirs; boolean isDestContent; Groupping groupping; + int batchSize; } /* diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java index ca99a4967f..197e268101 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java @@ -27,10 +27,15 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; @@ -51,6 +57,7 @@ import org.junit.Before; import org.junit.Test; public class TestGetHDFSFileInfo { + private static final Pattern SINGLE_JSON_PATTERN = Pattern.compile("^\\{[^\\}]*\\}$"); private TestRunner runner; private GetHDFSFileInfoWithMockedFileSystem proc; @@ -69,6 +76,69 @@ public class TestGetHDFSFileInfo { runner.setProperty(GetHDFSFileInfo.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); } + @Test + public void testInvalidBatchSizeWhenDestinationAndGroupingDoesntAllowBatchSize() throws Exception { + Arrays.asList("1", "2", "100").forEach( + validBatchSize -> { + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, validBatchSize, false); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, validBatchSize, false); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, validBatchSize, false); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, validBatchSize, false); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, validBatchSize, false); + } + ); + } + + @Test + public void testInvalidBatchSizeWhenValueIsInvalid() throws Exception { + Arrays.asList("-1", "0", "someString").forEach( + inValidBatchSize -> { + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, inValidBatchSize, false); + } + ); + } + + @Test + public void testValidBatchSize() throws Exception { + Arrays.asList("1", "2", "100").forEach( + validBatchSize -> { + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, validBatchSize, true); + } + ); + + Arrays.asList((String)null).forEach( + nullBatchSize -> { + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, nullBatchSize, true); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, nullBatchSize, true); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, nullBatchSize, true); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, nullBatchSize, true); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, nullBatchSize, true); + testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, nullBatchSize, true); + } + ); + } + + private void testValidateBatchSize(AllowableValue destination, AllowableValue grouping, String batchSize, boolean expectedValid) { + runner.clearProperties(); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, destination); + runner.setProperty(GetHDFSFileInfo.GROUPING, grouping); + if (batchSize != null) { + runner.setProperty(GetHDFSFileInfo.BATCH_SIZE, "" + batchSize); + } + + if (expectedValid) { + runner.assertValid(); + } else { + runner.assertNotValid(); + } + } + @Test public void testNoRunOnIncomingConnectionExists() throws InterruptedException { @@ -563,6 +633,105 @@ public class TestGetHDFSFileInfo { Assert.assertEquals(matchCount, 5); } + @Test + public void testBatchSizeWithDestAttributesGroupAllBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, 1); + } + + @Test + public void testBatchSizeWithDestAttributesGroupDirBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, 5); + } + + @Test + public void testBatchSizeWithDestAttributesGroupNoneBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, 9); + } + + @Test + public void testBatchSizeWithDestContentGroupAllBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, 1); + } + + @Test + public void testBatchSizeWithDestContentGroupDirBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, 5); + } + + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSizeNull() throws Exception { + testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 9); + + checkContentSizes(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize1() throws Exception { + testBatchSize("1", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 9); + checkContentSizes(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize3() throws Exception { + testBatchSize("3", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 3); + checkContentSizes(Arrays.asList(3, 3, 3)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize4() throws Exception { + testBatchSize("4", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 3); + checkContentSizes(Arrays.asList(4, 4, 1)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize5() throws Exception { + testBatchSize("5", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 2); + checkContentSizes(Arrays.asList(5, 4)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize9() throws Exception { + testBatchSize("9", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 1); + checkContentSizes(Arrays.asList(9)); + } + @Test + public void testBatchSizeWithDestContentGroupNoneBatchSize100() throws Exception { + testBatchSize("100", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 1); + checkContentSizes(Arrays.asList(9)); + } + + private void testBatchSize(String batchSize, AllowableValue destination, AllowableValue grouping, int expectedNrTransferredToSuccess) { + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, destination); + runner.setProperty(GetHDFSFileInfo.GROUPING, grouping); + if (batchSize != null) { + runner.setProperty(GetHDFSFileInfo.BATCH_SIZE, batchSize); + } + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, expectedNrTransferredToSuccess); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + } + + private void checkContentSizes(List expectedNumberOfRecords) { + List actualNumberOfRecords = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS).stream() + .map(MockFlowFile::toByteArray) + .map(String::new) + .map( + content -> Arrays.stream(content.split("\n")) + .filter(line -> SINGLE_JSON_PATTERN.matcher(line).matches()) + .count() + ) + .map(Long::intValue) + .collect(Collectors.toList()); + + assertEquals(expectedNumberOfRecords, actualNumberOfRecords); + } + /* * For all basic tests, this provides a structure of files in dirs: * Total number of dirs: 9 (1 root, 4 dotted)