NIFI-533: Refactored for Unit Tests and added unit tests for ListHDFS

This commit is contained in:
Mark Payne 2015-04-28 08:46:38 -04:00
parent dc7f7a82a6
commit e4f431561e
8 changed files with 416 additions and 44 deletions

View File

@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// variables shared by all threads of this processor // variables shared by all threads of this processor
// Hadoop Configuration and FileSystem // Hadoop Configuration and FileSystem
protected final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>(); private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
@Override @Override
protected void init(ProcessorInitializationContext context) { protected void init(ProcessorInitializationContext context) {
@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true"); config.set(disableCacheName, "true");
final FileSystem fs = FileSystem.get(config); final FileSystem fs = getFileSystem(config);
getLogger().info( getLogger().info(
"Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)),
@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
} }
/**
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting
* for UDP packets to be received
*
* @param config the configuration to use
* @return the FileSystem that is created for the given Configuration
* @throws IOException if unable to create the FileSystem
*/
protected FileSystem getFileSystem(final Configuration config) throws IOException {
return FileSystem.get(config);
}
/* /*
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get() * Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
*/ */
@ -243,4 +255,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
return builder.toString(); return builder.toString();
} }
protected Configuration getConfiguration() {
return hdfsResources.get().getKey();
}
protected FileSystem getFileSystem() {
return hdfsResources.get().getValue();
}
} }

View File

@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try { try {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType); flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS); session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (ProcessException e) { } catch (ProcessException e) {

View File

@ -99,7 +99,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return; return;
} }
final FileSystem hdfs = hdfsResources.get().getValue(); final FileSystem hdfs = getFileSystem();
final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
final URI uri = path.toUri(); final URI uri = path.toUri();

View File

@ -236,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
abstractOnScheduled(context); abstractOnScheduled(context);
// copy configuration values to pass them around cleanly // copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context); processorConfig = new ProcessorConfiguration(context);
FileSystem fs = hdfsResources.get().getValue(); final FileSystem fs = getFileSystem();
Path dir = new Path(context.getProperty(DIRECTORY).getValue()); final Path dir = new Path(context.getProperty(DIRECTORY).getValue());
if (!fs.exists(dir)) { if (!fs.exists(dir)) {
throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
} }
@ -330,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files // process the batch of files
FSDataInputStream stream = null; FSDataInputStream stream = null;
Configuration conf = hdfsResources.get().getKey(); Configuration conf = getConfiguration();
FileSystem hdfs = hdfsResources.get().getValue(); FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
@ -398,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) { if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
try { try {
final FileSystem hdfs = hdfsResources.get().getValue(); final FileSystem hdfs = getFileSystem();
// get listing // get listing
listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null); listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null);
lastPollTime.set(System.currentTimeMillis()); lastPollTime.set(System.currentTimeMillis());

View File

@ -22,6 +22,9 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** /**
* This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested
@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS {
@Override @Override
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get(); final Configuration conf = getConfiguration();
final Configuration conf = hadoopResources.getKey(); final FileSystem hdfs = getFileSystem();
final FileSystem hdfs = hadoopResources.getValue();
final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue(); final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);

View File

@ -118,12 +118,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile Long lastListingTime = null; private volatile Long lastListingTime = null;
private volatile Set<Path> latestPathsListed = new HashSet<>(); private volatile Set<Path> latestPathsListed = new HashSet<>();
private volatile boolean electedPrimaryNode = false; private volatile boolean electedPrimaryNode = false;
private File persistenceFile = null;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
super.init(context); super.init(context);
persistenceFile = new File("conf/state/" + getIdentifier()); }
protected File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
} }
@Override @Override
@ -143,7 +145,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
return relationships; return relationships;
} }
private String getKey(final String directory) { protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory; return getIdentifier() + ".lastListingTime." + directory;
} }
@ -169,18 +171,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
@Override private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String directory = context.getProperty(DIRECTORY).getValue();
// Determine the timestamp for the last file that we've listed. // Determine the timestamp for the last file that we've listed.
Long minTimestamp = lastListingTime; Long minTimestamp = lastListingTime;
if ( minTimestamp == null || electedPrimaryNode ) { if ( minTimestamp == null || electedPrimaryNode ) {
// We haven't yet restored any state from local or distributed state - or it's been at least a minute since // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
// we have performed a listing. In this case, // we have performed a listing. In this case,
// First, attempt to get timestamp from distributed cache service. // First, attempt to get timestamp from distributed cache service.
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try { try {
final StringSerDe serde = new StringSerDe(); final StringSerDe serde = new StringSerDe();
final String serializedState = client.get(getKey(directory), serde, serde); final String serializedState = client.get(getKey(directory), serde, serde);
@ -197,14 +194,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
this.lastListingTime = minTimestamp; this.lastListingTime = minTimestamp;
electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); throw ioe;
context.yield();
return;
} }
// Check the persistence file. We want to use the latest timestamp that we have so that // Check the persistence file. We want to use the latest timestamp that we have so that
// we don't duplicate data. // we don't duplicate data.
try { try {
final File persistenceFile = getPersistenceFile();
if ( persistenceFile.exists() ) { if ( persistenceFile.exists() ) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) { try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
final Properties props = new Properties(); final Properties props = new Properties();
@ -240,9 +236,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
} }
return minTimestamp;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String directory = context.getProperty(DIRECTORY).getValue();
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final Long minTimestamp;
try {
minTimestamp = getMinTimestamp(directory, client);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
context.yield();
return;
}
// Pull in any file that is newer than the timestamp that we have. // Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = hdfsResources.get().getValue(); final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final Path rootPath = new Path(directory); final Path rootPath = new Path(directory);
@ -311,7 +323,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
// Attempt to save state to remote server. // Attempt to save state to remote server.
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try { try {
client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -397,11 +408,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
} }
private void persistLocalState(final String directory, final String serializedState) throws IOException { protected void persistLocalState(final String directory, final String serializedState) throws IOException {
// we need to keep track of all files that we pulled in that had a modification time equal to // we need to keep track of all files that we pulled in that had a modification time equal to
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
// that have a mod time equal to that timestamp because more files may come in with the same timestamp // that have a mod time equal to that timestamp because more files may come in with the same timestamp
// later in the same millisecond. // later in the same millisecond.
final File persistenceFile = getPersistenceFile();
final File dir = persistenceFile.getParentFile(); final File dir = persistenceFile.getParentFile();
if ( !dir.exists() && !dir.mkdirs() ) { if ( !dir.exists() && !dir.mkdirs() ) {
throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");

View File

@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
/** /**
* This processor copies FlowFiles to HDFS. * This processor copies FlowFiles to HDFS.
@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} else { } else {
dfsUmask = FsPermission.DEFAULT_UMASK; dfsUmask = FsPermission.DEFAULT_UMASK;
} }
final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); final Configuration conf = getConfiguration();
final Configuration conf = resources.getKey();
FsPermission.setUMask(conf, new FsPermission(dfsUmask)); FsPermission.setUMask(conf, new FsPermission(dfsUmask));
} }
@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
return; return;
} }
final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); final Configuration configuration = getConfiguration();
if (resources == null || resources.getKey() == null || resources.getValue() == null) { final FileSystem hdfs = getFileSystem();
if (configuration == null || hdfs == null) {
getLogger().error("HDFS not configured properly"); getLogger().error("HDFS not configured properly");
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
context.yield(); context.yield();
return; return;
} }
final Configuration conf = resources.getKey();
final FileSystem hdfs = resources.getValue();
final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile) final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
.getValue());
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
BUFFER_SIZE_DEFAULT);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Create destination directory if it does not exist // Create destination directory if it does not exist
try { try {
if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) { if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
} }
} catch (FileNotFoundException fe) { } catch (FileNotFoundException fe) {

View File

@ -0,0 +1,347 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestListHDFS {
private TestRunner runner;
private ListHDFSWithMockedFileSystem proc;
private MockCacheClient service;
@Before
public void setup() throws InitializationException {
proc = new ListHDFSWithMockedFileSystem();
runner = TestRunners.newTestRunner(proc);
service = new MockCacheClient();
runner.addControllerService("service", service);
runner.enableControllerService(service);
runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
runner.setProperty(ListHDFS.DIRECTORY, "/test");
runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
}
@Test
public void testListingHasCorrectAttributes() {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
mff.assertAttributeEquals("path", "/test");
mff.assertAttributeEquals("filename", "testFile.txt");
}
@Test
public void testRecursive() {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
mff1.assertAttributeEquals("path", "/test");
mff1.assertAttributeEquals("filename", "testFile.txt");
final MockFlowFile mff2 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(1);
mff2.assertAttributeEquals("path", "/test/testDir");
mff2.assertAttributeEquals("filename", "1.txt");
}
@Test
public void testNotRecursive() {
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
mff1.assertAttributeEquals("path", "/test");
mff1.assertAttributeEquals("filename", "testFile.txt");
}
@Test
public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
mff1.assertAttributeEquals("path", "/test");
mff1.assertAttributeEquals("filename", "testFile.txt");
runner.clearTransferState();
// add new file to pull
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
// trigger primary node change
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
// cause calls to service to fail
service.failOnCalls = true;
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
final String key = proc.getKey("/test");
// wait just to a bit to ensure that the timestamp changes when we update the service
final Object curVal = service.values.get(key);
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
}
service.failOnCalls = false;
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
// ensure state saved both locally & remotely
assertTrue(proc.localStateSaved);
assertNotNull(service.values.get(key));
assertNotSame(curVal, service.values.get(key));
}
private FsPermission create777() {
return new FsPermission((short) 0777);
}
private class ListHDFSWithMockedFileSystem extends ListHDFS {
private final MockFileSystem fileSystem = new MockFileSystem();
private boolean localStateSaved = false;
@Override
protected FileSystem getFileSystem() {
return fileSystem;
}
@Override
protected File getPersistenceFile() {
return new File("target/conf/state-file");
}
@Override
protected FileSystem getFileSystem(Configuration config) throws IOException {
return fileSystem;
}
@Override
protected void persistLocalState(String directory, String serializedState) throws IOException {
super.persistLocalState(directory, serializedState);
localStateSaved = true;
}
}
private class MockFileSystem extends FileSystem {
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
public void addFileStatus(final Path parent, final FileStatus child) {
Set<FileStatus> children = fileStatuses.get(parent);
if ( children == null ) {
children = new HashSet<>();
fileStatuses.put(parent, children);
}
children.add(child);
}
@Override
public long getDefaultBlockSize() {
return 1024L;
}
@Override
public short getDefaultReplication() {
return 1;
}
@Override
public URI getUri() {
return null;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return false;
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
final Set<FileStatus> statuses = fileStatuses.get(f);
if ( statuses == null ) {
return new FileStatus[0];
}
return statuses.toArray(new FileStatus[statuses.size()]);
}
@Override
public void setWorkingDirectory(Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return new Path(new File(".").getAbsolutePath());
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return null;
}
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
private void verifyNotFail() throws IOException {
if ( failOnCalls ) {
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
}
}
@Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
final Object retValue = values.putIfAbsent(key, value);
return (retValue == null);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
}
@Override
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
verifyNotFail();
return values.containsKey(key);
}
@Override
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
values.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.get(key);
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
verifyNotFail();
values.remove(key);
return true;
}
}
}