mirror of https://github.com/apache/nifi.git
NIFI-271
This commit is contained in:
parent
9a3b6bed62
commit
10860944d1
|
@ -1,18 +1,18 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -45,16 +45,15 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
/**
|
||||
* This is a base class that is helpful when building processors interacting
|
||||
* with HDFS.
|
||||
* This is a base class that is helpful when building processors interacting with HDFS.
|
||||
*/
|
||||
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||
|
||||
// properties
|
||||
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
|
||||
.name("Hadoop Configuration Resources")
|
||||
.description(
|
||||
"A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
|
||||
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
|
||||
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
|
||||
.required(false)
|
||||
.addValidator(createMultipleFilesExistValidator())
|
||||
.build();
|
||||
|
|
|
@ -38,31 +38,21 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* This processor is used to create a Hadoop Sequence File, which essentially is
|
||||
* a file of key/value pairs. The key will be a file name and the value will be
|
||||
* the flow file content. The processor will take either a merged (a.k.a.
|
||||
* packaged) flow file or a singular flow file. Historically, this processor
|
||||
* handled the merging by type and size or time prior to creating a SequenceFile
|
||||
* output; it no longer does this. If creating a SequenceFile that contains
|
||||
* multiple files of the same type is desired, precede this processor with a
|
||||
* <code>RouteOnAttribute</code> processor to segregate files of the same type
|
||||
* and follow that with a <code>MergeContent</code> processor to bundle up
|
||||
* files. If the type of files is not important, just use the
|
||||
* <code>MergeContent</code> processor. When using the <code>MergeContent</code>
|
||||
* processor, the following Merge Formats are supported by this processor:
|
||||
* This processor is used to create a Hadoop Sequence File, which essentially is a file of key/value pairs. The key will be a file name and the value will be the flow file content. The processor will
|
||||
* take either a merged (a.k.a. packaged) flow file or a singular flow file. Historically, this processor handled the merging by type and size or time prior to creating a SequenceFile output; it no
|
||||
* longer does this. If creating a SequenceFile that contains multiple files of the same type is desired, precede this processor with a <code>RouteOnAttribute</code> processor to segregate files of
|
||||
* the same type and follow that with a <code>MergeContent</code> processor to bundle up files. If the type of files is not important, just use the <code>MergeContent</code> processor. When using the
|
||||
* <code>MergeContent</code> processor, the following Merge Formats are supported by this processor:
|
||||
* <ul>
|
||||
* <li>TAR</li>
|
||||
* <li>ZIP</li>
|
||||
* <li>FlowFileStream v3</li>
|
||||
* </ul>
|
||||
* The created SequenceFile is named the same as the incoming FlowFile with the
|
||||
* suffix '.sf'. For incoming FlowFiles that are bundled, the keys in the
|
||||
* SequenceFile are the individual file names, the values are the contents of
|
||||
* each file.
|
||||
* The created SequenceFile is named the same as the incoming FlowFile with the suffix '.sf'. For incoming FlowFiles that are bundled, the keys in the SequenceFile are the individual file names, the
|
||||
* values are the contents of each file.
|
||||
* </p>
|
||||
* NOTE: The value portion of a key/value pair is loaded into memory. While
|
||||
* there is a max size limit of 2GB, this could cause memory issues if there are
|
||||
* too many concurrent tasks and the flow file sizes are large.
|
||||
* NOTE: The value portion of a key/value pair is loaded into memory. While there is a max size limit of 2GB, this could cause memory issues if there are too many concurrent tasks and the flow file
|
||||
* sizes are large.
|
||||
*
|
||||
*/
|
||||
@SideEffectFree
|
||||
|
|
|
@ -65,8 +65,10 @@ import org.apache.nifi.util.StopWatch;
|
|||
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
|
||||
@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
|
||||
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
|
||||
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
|
||||
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if "
|
||||
+ "the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse "
|
||||
+ "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
|
||||
@SeeAlso(PutHDFS.class)
|
||||
public class GetHDFS extends AbstractHadoopProcessor {
|
||||
|
||||
|
@ -112,16 +114,16 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
|
||||
.name("File Filter Regex")
|
||||
.description(
|
||||
"A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched")
|
||||
.description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
|
||||
+ "Expression will be fetched, otherwise all files will be fetched")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder()
|
||||
.name("Filter Match Name Only")
|
||||
.description(
|
||||
"If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison")
|
||||
.description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename "
|
||||
+ "in the regex comparison")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
|
@ -137,21 +139,17 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
|
||||
.name("Minimum File Age")
|
||||
.description(
|
||||
"The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
|
||||
.description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
|
||||
.required(true)
|
||||
.addValidator(
|
||||
StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
|
||||
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
|
||||
.defaultValue("0 sec")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
|
||||
.name("Maximum File Age")
|
||||
.description(
|
||||
"The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
|
||||
.description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
|
||||
.required(false)
|
||||
.addValidator(
|
||||
StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
|
||||
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
|
@ -389,11 +387,11 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
/**
|
||||
* Do a listing of HDFS if the POLLING_INTERVAL has lapsed.
|
||||
*
|
||||
* Will return null if POLLING_INTERVAL has not lapsed. Will return an empty
|
||||
* set if no files were found on HDFS that matched the configured filters.
|
||||
* @param context
|
||||
* @return
|
||||
* @throws java.io.IOException
|
||||
* Will return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters.
|
||||
*
|
||||
* @param context context
|
||||
* @return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters
|
||||
* @throws java.io.IOException ex
|
||||
*/
|
||||
protected Set<Path> performListing(final ProcessContext context) throws IOException {
|
||||
|
||||
|
@ -417,11 +415,12 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
/**
|
||||
* Poll HDFS for files to process that match the configured file filters.
|
||||
* @param hdfs
|
||||
* @param dir
|
||||
* @param filesVisited
|
||||
* @return
|
||||
* @throws java.io.IOException
|
||||
*
|
||||
* @param hdfs hdfs
|
||||
* @param dir dir
|
||||
* @param filesVisited filesVisited
|
||||
* @return files to process
|
||||
* @throws java.io.IOException ex
|
||||
*/
|
||||
protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException {
|
||||
if (null == filesVisited) {
|
||||
|
@ -465,11 +464,11 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the relative path of the child that does not include the filename
|
||||
* or the root path.
|
||||
* @param root
|
||||
* @param child
|
||||
* @return
|
||||
* Returns the relative path of the child that does not include the filename or the root path.
|
||||
*
|
||||
* @param root root
|
||||
* @param child child
|
||||
* @return the relative path of the child that does not include the filename or the root path
|
||||
*/
|
||||
public static String getPathDifference(final Path root, final Path child) {
|
||||
final int depthDiff = child.depth() - root.depth();
|
||||
|
@ -492,8 +491,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Holder for a snapshot in time of some processor properties that are
|
||||
* passed around.
|
||||
* Holder for a snapshot in time of some processor properties that are passed around.
|
||||
*/
|
||||
protected static class ProcessorConfiguration {
|
||||
|
||||
|
|
|
@ -40,19 +40,13 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* This processor is used to pull files from HDFS. The files being pulled in
|
||||
* MUST be SequenceFile formatted files. The processor creates a flow file for
|
||||
* each key/value entry in the ingested SequenceFile. The created flow file's
|
||||
* content depends on the value of the optional configuration property FlowFile
|
||||
* Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR.
|
||||
* With the prior, only the SequenceFile value element is written to the flow
|
||||
* file contents. With the latter, the SequenceFile key and value are written to
|
||||
* the flow file contents as serialized objects; the format is key length (int),
|
||||
* key(String), value length(int), value(bytes). The default is VALUE ONLY.
|
||||
* This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested
|
||||
* SequenceFile. The created flow file's content depends on the value of the optional configuration property FlowFile Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR. With the
|
||||
* prior, only the SequenceFile value element is written to the flow file contents. With the latter, the SequenceFile key and value are written to the flow file contents as serialized objects; the
|
||||
* format is key length (int), key(String), value length(int), value(bytes). The default is VALUE ONLY.
|
||||
* <p>
|
||||
* NOTE: This processor loads the entire value entry into memory. While the size
|
||||
* limit for a value entry is 2GB, this will cause memory problems if there are
|
||||
* too many concurrent tasks and the data being ingested is large.
|
||||
* NOTE: This processor loads the entire value entry into memory. While the size limit for a value entry is 2GB, this will cause memory problems if there are too many concurrent tasks and the data
|
||||
* being ingested is large.
|
||||
*
|
||||
*/
|
||||
@TriggerWhenEmpty
|
||||
|
|
|
@ -43,10 +43,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class reads a SequenceFile and generates FlowFiles, one per KeyValue
|
||||
* pair in the SequenceFile. The FlowFile name is based on the the incoming file
|
||||
* name with System nanotime appended; the FlowFile content is the key/value
|
||||
* pair serialized via Text.
|
||||
* This class reads a SequenceFile and generates FlowFiles, one per KeyValue pair in the SequenceFile. The FlowFile name is based on the the incoming file name with System nanotime appended; the
|
||||
* FlowFile content is the key/value pair serialized via Text.
|
||||
*/
|
||||
public class KeyValueReader implements SequenceFileReader<Set<FlowFile>> {
|
||||
|
||||
|
|
|
@ -42,9 +42,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class reads a SequenceFile and generates FlowFiles, one per each
|
||||
* KeyValue Pair in the SequenceFile. The FlowFile name is the key, which is
|
||||
* typically a file name but may not be; the FlowFile content is the value.
|
||||
* This class reads a SequenceFile and generates FlowFiles, one per each KeyValue Pair in the SequenceFile. The FlowFile name is the key, which is typically a file name but may not be; the FlowFile
|
||||
* content is the value.
|
||||
*
|
||||
*/
|
||||
public class ValueReader implements SequenceFileReader<Set<FlowFile>> {
|
||||
|
|
|
@ -24,10 +24,8 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This class allows the user to define byte-array filters or single-byte
|
||||
* filters that will modify the content that is written to the underlying
|
||||
* stream. Each filter can be given a maximum number of replacements that it
|
||||
* should perform.
|
||||
* This class allows the user to define byte-array filters or single-byte filters that will modify the content that is written to the underlying stream. Each filter can be given a maximum number of
|
||||
* replacements that it should perform.
|
||||
*/
|
||||
public class ByteFilteringOutputStream extends FilterOutputStream {
|
||||
|
||||
|
@ -66,8 +64,7 @@ public class ByteFilteringOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Causes this stream to write <tt>replaceWith</tt> in place of
|
||||
* <tt>toReplace</tt> if {@link #write(byte[], int, int)} is called where
|
||||
* the value to write is equal to
|
||||
* <tt>toReplace</tt> if {@link #write(byte[], int, int)} is called where the value to write is equal to
|
||||
* <tt>toReplace</tt>.
|
||||
* <p/>
|
||||
* @param toReplace the byte array to replace
|
||||
|
@ -79,14 +76,12 @@ public class ByteFilteringOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Causes this stream to write <tt>replaceWith</tt> in place of
|
||||
* <tt>toReplace</tt> if {@link #write(byte[], int, int)} is called where
|
||||
* the value to write is equal to
|
||||
* <tt>toReplace</tt> if {@link #write(byte[], int, int)} is called where the value to write is equal to
|
||||
* <tt>toReplace</tt>.
|
||||
* <p/>
|
||||
* @param toReplace the byte array to replace
|
||||
* @param replaceWith the byte array to be substituted
|
||||
* @param maxReplacements the maximum number of replacements that should be
|
||||
* made
|
||||
* @param maxReplacements the maximum number of replacements that should be made
|
||||
*/
|
||||
public void addFilter(final byte[] toReplace, final byte[] replaceWith, final int maxReplacements) {
|
||||
multiByteFilters.add(new Filter(toReplace, replaceWith, maxReplacements));
|
||||
|
@ -94,8 +89,7 @@ public class ByteFilteringOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Causes this stream to write <tt>replaceWith</tt> in place of
|
||||
* <tt>toReplace</tt> if {@link #write(int)} is called where the value to
|
||||
* write is equal to
|
||||
* <tt>toReplace</tt> if {@link #write(int)} is called where the value to write is equal to
|
||||
* <tt>toReplace</tt>.
|
||||
* <p/>
|
||||
* @param toReplace the byte to replace
|
||||
|
@ -107,14 +101,12 @@ public class ByteFilteringOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Causes this stream to write <tt>replaceWith</tt> in place of
|
||||
* <tt>toReplace</tt> if {@link #write(int)} is called where the value to
|
||||
* write is equal to
|
||||
* <tt>toReplace</tt> if {@link #write(int)} is called where the value to write is equal to
|
||||
* <tt>toReplace</tt>.
|
||||
* <p/>
|
||||
* @param toReplace the byte to replace
|
||||
* @param replaceWith the byte to be substituted
|
||||
* @param maxReplacements the maximum number of replacements that should be
|
||||
* made
|
||||
* @param maxReplacements the maximum number of replacements that should be made
|
||||
*/
|
||||
public void addFilter(final byte toReplace, final byte replaceWith, final int maxReplacements) {
|
||||
singleByteFilters.add(new Filter(new byte[]{toReplace}, new byte[]{replaceWith}, maxReplacements));
|
||||
|
|
|
@ -24,10 +24,8 @@ import java.io.InputStream;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Simple implementation of {@link Writable} that writes data from an
|
||||
* InputStream. This class will throw an
|
||||
* <tt>UnsupportedOperationException</tt> if {@link #readFields(DataInput)} is
|
||||
* called.
|
||||
* Simple implementation of {@link Writable} that writes data from an InputStream. This class will throw an
|
||||
* <tt>UnsupportedOperationException</tt> if {@link #readFields(DataInput)} is called.
|
||||
*/
|
||||
public class InputStreamWritable implements Writable {
|
||||
|
||||
|
|
|
@ -28,8 +28,7 @@ import org.apache.hadoop.io.DataInputBuffer;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* This class will write to an output stream, rather than an in-memory buffer,
|
||||
* the fields being read.
|
||||
* This class will write to an output stream, rather than an in-memory buffer, the fields being read.
|
||||
*
|
||||
* @author unattributed
|
||||
*
|
||||
|
|
|
@ -25,15 +25,13 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|||
public interface SequenceFileWriter {
|
||||
|
||||
/**
|
||||
* Creates a Sequence File by writing the given FlowFile as key/value pairs.
|
||||
* The provided FlowFile may be a package of multiple FlowFiles, or just
|
||||
* one. The keys for the Sequence File are the flow files' logical names.
|
||||
* The values are the flow files' content.
|
||||
* Creates a Sequence File by writing the given FlowFile as key/value pairs. The provided FlowFile may be a package of multiple FlowFiles, or just one. The keys for the Sequence File are the flow
|
||||
* files' logical names. The values are the flow files' content.
|
||||
*
|
||||
* @param flowFile - the FlowFile to write to the Sequence File.
|
||||
* @param session
|
||||
* @param configuration
|
||||
* @param compressionType
|
||||
* @param session session
|
||||
* @param configuration configuration
|
||||
* @param compressionType compression type
|
||||
* @return the written to SequenceFile flow file
|
||||
*/
|
||||
FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType);
|
||||
|
|
Loading…
Reference in New Issue