NIFI-6992 - Add "Batch Size" property to GetHDFSFileInfo processor - Added "Batch Size property", takes effect only if "Destination" is set to "Content" and Grouping" is set to "None"

NIFI-6992 - Add "Batch Size" property to GetHDFSFileInfo processor - Added validation for 'Batch Size' in 'GetHDFSFileInfo'.
NIFI-6992 - Changed 'GetHDFSFileInfo.BATCH_SIZE' validator from 'NON_NEGATIVE_INTEGER_VALIDATOR' to 'POSITIVE_INTEGER_VALIDATOR'. Added more tests.
NIFI-6992 - Removed 'AllowEmptyValidator'. 'Batch Size' in 'GetHDFSFileInfo' allows null but not empty String.
NIFI-6992 - 'Batch Size' in 'GetHDFSFileInfo' allows null but not empty String - cont.
NIFI-6992 - Fix: Unused import.

This closes #3966.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-01-09 13:47:33 +01:00 committed by Peter Turcsanyi
parent bec1f772db
commit 26fcf8158a
2 changed files with 304 additions and 26 deletions

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
@ -45,6 +46,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -178,6 +181,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
.defaultValue(GROUP_ALL.getValue())
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Batch Size")
.name("gethdfsfileinfo-batch-size")
.description("Number of records to put into an output flowfile when 'Destination' is set to 'Content'"
+ " and 'Group Results' is set to 'None'")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
"Details of given HDFS object will be stored in attributes of flowfile. "
+ "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
@ -234,6 +246,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
props.add(IGNORE_DOTTED_DIRS);
props.add(IGNORE_DOTTED_FILES);
props.add(GROUPING);
props.add(BATCH_SIZE);
props.add(DESTINATION);
return props;
}
@ -255,6 +268,30 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
req = null;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
String destination = validationContext.getProperty(DESTINATION).getValue();
String grouping = validationContext.getProperty(GROUPING).getValue();
String batchSize = validationContext.getProperty(BATCH_SIZE).getValue();
if (
(!DESTINATION_CONTENT.getValue().equals(destination) || !GROUP_NONE.getValue().equals(grouping))
&& batchSize != null
) {
validationResults.add(new ValidationResult.Builder()
.valid(false)
.subject(BATCH_SIZE.getDisplayName())
.explanation("'" + BATCH_SIZE.getDisplayName() + "' is applicable only when " +
"'" + DESTINATION.getDisplayName() + "'='" + DESTINATION_CONTENT.getDisplayName() + "' and " +
"'" + GROUPING.getDisplayName() + "'='" + GROUP_NONE.getDisplayName() + "'")
.build());
}
return validationResults;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile ff = null;
@ -286,7 +323,9 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
try {
final FileSystem hdfs = getFileSystem();
UserGroupInformation ugi = getUserGroupInformation();
HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, hdfs, ugi, req, null, false);
ExecutionContext executionContext = new ExecutionContext();
HDFSObjectInfoDetails res = walkHDFSTree(context, session, executionContext, ff, hdfs, ugi, req, null, false);
executionContext.finish(session);
if (res == null) {
ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath);
session.transfer(ff, REL_NOT_FOUND);
@ -320,8 +359,10 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
/*
* Walks thru HDFS tree. This method will return null to the main if there is no provided path existing.
*/
protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, FlowFile origFF, final FileSystem hdfs,
final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, final boolean statsOnly) throws Exception{
protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, ExecutionContext executionContext,
FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent,
final boolean statsOnly
) throws Exception{
final HDFSObjectInfoDetails p = parent;
@ -334,7 +375,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
}
if (parent.isFile() && p == null) {
//single file path requested and found, lets send to output:
processHDFSObject(context, session, origFF, req, parent, true);
processHDFSObject(context, session, executionContext, origFF, req, parent, true);
return parent;
}
@ -345,7 +386,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
}catch (IOException e) {
parent.error = "Couldn't list directory: " + e;
processHDFSObject(context, session, origFF, req, parent, p == null);
processHDFSObject(context, session, executionContext, origFF, req, parent, p == null);
return parent; //File not found exception, or access denied - don't interrupt, just don't list
}
if (listFSt != null) {
@ -353,7 +394,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f);
HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req);
if (o.isDirectory() && !o.isSymlink() && req.isRecursive) {
o = walkHDFSTree(context, session, origFF, hdfs, ugi, req, o, vo == null || statsOnly);
o = walkHDFSTree(context, session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly);
parent.countDirs += o.countDirs;
parent.totalLen += o.totalLen;
parent.countFiles += o.countFiles;
@ -370,12 +411,12 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
if (vo != null && !statsOnly) {
parent.addChild(vo);
if (vo.isFile() && !vo.isSymlink()) {
processHDFSObject(context, session, origFF, req, vo, false);
processHDFSObject(context, session, executionContext, origFF, req, vo, false);
}
}
}
if (!statsOnly) {
processHDFSObject(context, session, origFF, req, parent, p==null);
processHDFSObject(context, session, executionContext, origFF, req, parent, p==null);
}
if (req.groupping != Groupping.ALL) {
parent.setChildren(null); //we need children in full tree only when single output requested.
@ -421,8 +462,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
* Checks whether HDFS object should be sent to output.
* If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params.
*/
protected HDFSObjectInfoDetails processHDFSObject(final ProcessContext context, final ProcessSession session,
FlowFile origFF, final HDFSFileInfoRequest req, final HDFSObjectInfoDetails o, final boolean isRoot) {
protected HDFSObjectInfoDetails processHDFSObject(
final ProcessContext context,
final ProcessSession session,
final ExecutionContext executionContext,
FlowFile origFF,
final HDFSFileInfoRequest req,
final HDFSObjectInfoDetails o,
final boolean isRoot
) {
if (o.isFile() && req.groupping != Groupping.NONE) {
return null; //there is grouping by either root directory or every directory, no need to print separate files.
}
@ -432,7 +480,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) {
return null;
}
FlowFile ff = session.create(origFF);
FlowFile ff = getReadyFlowFile(executionContext, session, origFF);
//if destination type is content - always add mime type
if (req.isDestContent) {
@ -441,53 +490,91 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
//won't combine conditions for similar actions for better readability and maintenance.
if (o.isFile() && isRoot && req.isDestContent) {
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isFile() && isRoot && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = addAsAttributes(session, o, ff);
// ------------------------------
}else if (o.isFile() && req.isDestContent) {
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isFile() && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = addAsAttributes(session, o, ff);
// ------------------------------
}else if (o.isDirectory() && o.isSymlink() && req.isDestContent) {
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = addAsAttributes(session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.NONE && req.isDestContent) {
o.setChildren(null);
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.NONE && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = addAsAttributes(session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.DIR && req.isDestContent) {
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.DIR && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString());
ff = addAsAttributes(session, o, ff);
ff = addFullTreeToAttribute(session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.ALL && req.isDestContent) {
ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes()));
ff = addAsContent(executionContext, session, o, ff);
// ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.ALL && !req.isDestContent) {
ff = session.putAllAttributes(ff, o.toAttributesMap());
ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString());
ff = addAsAttributes(session, o, ff);
ff = addFullTreeToAttribute(session, o, ff);
}else {
getLogger().error("Illegal State!");
session.remove(ff);
return null;
}
session.transfer(ff, REL_SUCCESS);
executionContext.flowfile = ff;
finishProcessing(executionContext, session);
return o;
}
private FlowFile getReadyFlowFile(ExecutionContext executionContext, ProcessSession session, FlowFile origFF) {
if (executionContext.flowfile == null) {
executionContext.flowfile = session.create(origFF);
}
return executionContext.flowfile;
}
private void finishProcessing(ExecutionContext executionContext, ProcessSession session) {
executionContext.nrOfWaitingHDFSObjects++;
if (req.groupping == Groupping.NONE && req.isDestContent && executionContext.nrOfWaitingHDFSObjects < req.batchSize) {
return;
}
session.transfer(executionContext.flowfile, REL_SUCCESS);
executionContext.reset();
}
private FlowFile addAsContent(ExecutionContext executionContext, ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) {
if (executionContext.nrOfWaitingHDFSObjects > 0) {
ff = session.append(ff, (out) -> out.write(("\n").getBytes()));
}
return session.append(ff, (out) -> out.write((o.toJsonString()).getBytes()));
}
private FlowFile addAsAttributes(ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) {
return session.putAllAttributes(ff, o.toAttributesMap());
}
private FlowFile addFullTreeToAttribute(ProcessSession session, HDFSObjectInfoDetails o, FlowFile ff) {
return session.putAttribute(ff, "hdfs.full.tree", o.toJsonString());
}
/*
* Returns permissions in readable format like rwxr-xr-x (755)
*/
@ -548,6 +635,10 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
req.isIgnoreDotDirs = context.getProperty(IGNORE_DOTTED_DIRS).asBoolean();
req.groupping = HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue());
req.batchSize = Optional.ofNullable(context.getProperty(BATCH_SIZE))
.filter(propertyValue -> propertyValue.getValue() != null)
.map(PropertyValue::asInteger)
.orElse(1);
v = context.getProperty(DESTINATION).getValue();
if (DESTINATION_CONTENT.getValue().equals(v)) {
@ -596,6 +687,23 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
return req;
}
static class ExecutionContext {
int nrOfWaitingHDFSObjects;
FlowFile flowfile;
void reset() {
nrOfWaitingHDFSObjects = 0;
flowfile = null;
}
void finish(ProcessSession session) {
if (flowfile != null) {
session.transfer(flowfile, REL_SUCCESS);
flowfile = null;
}
}
}
/*
* Keeps all request details in single object.
*/
@ -634,6 +742,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
boolean isIgnoreDotDirs;
boolean isDestContent;
Groupping groupping;
int batchSize;
}
/*

View File

@ -27,10 +27,15 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -39,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
@ -51,6 +57,7 @@ import org.junit.Before;
import org.junit.Test;
public class TestGetHDFSFileInfo {
private static final Pattern SINGLE_JSON_PATTERN = Pattern.compile("^\\{[^\\}]*\\}$");
private TestRunner runner;
private GetHDFSFileInfoWithMockedFileSystem proc;
@ -69,6 +76,69 @@ public class TestGetHDFSFileInfo {
runner.setProperty(GetHDFSFileInfo.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
}
@Test
public void testInvalidBatchSizeWhenDestinationAndGroupingDoesntAllowBatchSize() throws Exception {
Arrays.asList("1", "2", "100").forEach(
validBatchSize -> {
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, validBatchSize, false);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, validBatchSize, false);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, validBatchSize, false);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, validBatchSize, false);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, validBatchSize, false);
}
);
}
@Test
public void testInvalidBatchSizeWhenValueIsInvalid() throws Exception {
Arrays.asList("-1", "0", "someString").forEach(
inValidBatchSize -> {
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, inValidBatchSize, false);
}
);
}
@Test
public void testValidBatchSize() throws Exception {
Arrays.asList("1", "2", "100").forEach(
validBatchSize -> {
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, validBatchSize, true);
}
);
Arrays.asList((String)null).forEach(
nullBatchSize -> {
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, nullBatchSize, true);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, nullBatchSize, true);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, nullBatchSize, true);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, nullBatchSize, true);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, nullBatchSize, true);
testValidateBatchSize(GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, nullBatchSize, true);
}
);
}
private void testValidateBatchSize(AllowableValue destination, AllowableValue grouping, String batchSize, boolean expectedValid) {
runner.clearProperties();
runner.setIncomingConnection(false);
runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir");
runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true");
runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true");
runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true");
runner.setProperty(GetHDFSFileInfo.DESTINATION, destination);
runner.setProperty(GetHDFSFileInfo.GROUPING, grouping);
if (batchSize != null) {
runner.setProperty(GetHDFSFileInfo.BATCH_SIZE, "" + batchSize);
}
if (expectedValid) {
runner.assertValid();
} else {
runner.assertNotValid();
}
}
@Test
public void testNoRunOnIncomingConnectionExists() throws InterruptedException {
@ -563,6 +633,105 @@ public class TestGetHDFSFileInfo {
Assert.assertEquals(matchCount, 5);
}
@Test
public void testBatchSizeWithDestAttributesGroupAllBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_ALL, 1);
}
@Test
public void testBatchSizeWithDestAttributesGroupDirBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_PARENT_DIR, 5);
}
@Test
public void testBatchSizeWithDestAttributesGroupNoneBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_ATTRIBUTES, GetHDFSFileInfo.GROUP_NONE, 9);
}
@Test
public void testBatchSizeWithDestContentGroupAllBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_ALL, 1);
}
@Test
public void testBatchSizeWithDestContentGroupDirBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_PARENT_DIR, 5);
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSizeNull() throws Exception {
testBatchSize(null, GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 9);
checkContentSizes(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize1() throws Exception {
testBatchSize("1", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 9);
checkContentSizes(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize3() throws Exception {
testBatchSize("3", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 3);
checkContentSizes(Arrays.asList(3, 3, 3));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize4() throws Exception {
testBatchSize("4", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 3);
checkContentSizes(Arrays.asList(4, 4, 1));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize5() throws Exception {
testBatchSize("5", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 2);
checkContentSizes(Arrays.asList(5, 4));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize9() throws Exception {
testBatchSize("9", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 1);
checkContentSizes(Arrays.asList(9));
}
@Test
public void testBatchSizeWithDestContentGroupNoneBatchSize100() throws Exception {
testBatchSize("100", GetHDFSFileInfo.DESTINATION_CONTENT, GetHDFSFileInfo.GROUP_NONE, 1);
checkContentSizes(Arrays.asList(9));
}
private void testBatchSize(String batchSize, AllowableValue destination, AllowableValue grouping, int expectedNrTransferredToSuccess) {
setFileSystemBasicTree(proc.fileSystem);
runner.setIncomingConnection(false);
runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir");
runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true");
runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true");
runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true");
runner.setProperty(GetHDFSFileInfo.DESTINATION, destination);
runner.setProperty(GetHDFSFileInfo.GROUPING, grouping);
if (batchSize != null) {
runner.setProperty(GetHDFSFileInfo.BATCH_SIZE, batchSize);
}
runner.run();
runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, expectedNrTransferredToSuccess);
runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0);
runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0);
}
private void checkContentSizes(List<Integer> expectedNumberOfRecords) {
List<Integer> actualNumberOfRecords = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS).stream()
.map(MockFlowFile::toByteArray)
.map(String::new)
.map(
content -> Arrays.stream(content.split("\n"))
.filter(line -> SINGLE_JSON_PATTERN.matcher(line).matches())
.count()
)
.map(Long::intValue)
.collect(Collectors.toList());
assertEquals(expectedNumberOfRecords, actualNumberOfRecords);
}
/*
* For all basic tests, this provides a structure of files in dirs:
* Total number of dirs: 9 (1 root, 4 dotted)