mirror of https://github.com/apache/nifi.git
NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet
- Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - Refactoring AbstractPutHDFSRecord to use schema access strategy - Adding custom validate to AbstractPutHDFSRecord and adding handling of UNION types when writing Records as Avro - Refactoring project structure to get CS API references out of nifi-commons, introducing nifi-extension-utils under nifi-nar-bundles - Updating abstract put/fetch processors to obtain the WriteResult and update flow file attributes This closes #1712. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
11b935a27b
commit
60d88b5a64
|
@ -1236,6 +1236,14 @@ and can be found in the org.apache.hadoop.hive.ql.io.orc package
|
|||
|
||||
https://github.com/triplecheck/TLSH
|
||||
|
||||
(ASLv2) Apache Parquet
|
||||
The following NOTICE information applies:
|
||||
Apache Parquet MR (Incubating)
|
||||
Copyright 2014 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Hortonworks Schema Registry
|
||||
The following NOTICE information applies:
|
||||
Hortonworks Schema Registry
|
||||
|
|
|
@ -479,6 +479,11 @@
|
|||
<artifactId>nifi-cdc-mysql-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-parquet-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-hwx-schema-registry-nar</artifactId>
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-commons</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-record</artifactId>
|
||||
<description>
|
||||
This module contains the domain model for NiFi's Record abstraction, including
|
||||
several interfaces for interacting with Records. This module should not depend
|
||||
on any external libraries.
|
||||
</description>
|
||||
|
||||
</project>
|
|
@ -17,13 +17,13 @@
|
|||
|
||||
package org.apache.nifi.serialization.record;
|
||||
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
public class MapRecord implements Record {
|
||||
private final RecordSchema schema;
|
||||
private final Map<String, Object> values;
|
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.nifi.serialization.record;
|
||||
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
public class RecordField {
|
||||
private final String fieldName;
|
||||
private final DataType dataType;
|
|
@ -17,17 +17,17 @@
|
|||
|
||||
package org.apache.nifi.serialization.record;
|
||||
|
||||
import org.apache.nifi.serialization.record.type.ArrayDataType;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.type.MapDataType;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.serialization.record.type.ArrayDataType;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.type.MapDataType;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
|
||||
public enum RecordFieldType {
|
||||
/**
|
||||
* A String field type. Fields of this type use a {@code java.lang.String} value.
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.nifi.serialization.record.type;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class ArrayDataType extends DataType {
|
||||
private final DataType elementType;
|
||||
|
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.nifi.serialization.record.type;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ChoiceDataType extends DataType {
|
||||
private final List<DataType> possibleSubTypes;
|
||||
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.nifi.serialization.record.type;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class MapDataType extends DataType {
|
||||
private final DataType valueType;
|
||||
|
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.nifi.serialization.record.type;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class RecordDataType extends DataType {
|
||||
private final RecordSchema childSchema;
|
||||
|
|
@ -17,6 +17,14 @@
|
|||
|
||||
package org.apache.nifi.serialization.record.util;
|
||||
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.sql.Date;
|
||||
import java.sql.Time;
|
||||
|
@ -30,14 +38,6 @@ import java.util.Optional;
|
|||
import java.util.TimeZone;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
|
||||
public class DataTypeUtils {
|
||||
|
||||
private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
|
|
@ -33,11 +33,10 @@
|
|||
<module>nifi-socket-utils</module>
|
||||
<module>nifi-utils</module>
|
||||
<module>nifi-web-utils</module>
|
||||
<module>nifi-processor-utilities</module>
|
||||
<module>nifi-write-ahead-log</module>
|
||||
<module>nifi-site-to-site-client</module>
|
||||
<module>nifi-hl7-query-language</module>
|
||||
<module>nifi-hadoop-utils</module>
|
||||
<module>nifi-schema-utils</module>
|
||||
<module>nifi-record</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-commons</artifactId>
|
||||
<artifactId>nifi-extension-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-hadoop-utils</artifactId>
|
||||
|
@ -28,7 +28,6 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -37,10 +36,12 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
</dependency>
|
||||
<!-- Other modules using nifi-hadoop-utils are expected to have the below dependencies available, typically through a NAR dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
|
@ -17,15 +17,11 @@
|
|||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.io.compress.Lz4Codec;
|
||||
import org.apache.hadoop.io.compress.SnappyCodec;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
|
||||
|
@ -34,7 +30,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
@ -43,7 +38,6 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.File;
|
||||
|
@ -67,32 +61,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
*/
|
||||
@RequiresInstanceClassLoading(cloneAncestorResources = true)
|
||||
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||
/**
|
||||
* Compression Type Enum
|
||||
*/
|
||||
public enum CompressionType {
|
||||
NONE,
|
||||
DEFAULT,
|
||||
BZIP,
|
||||
GZIP,
|
||||
LZ4,
|
||||
SNAPPY,
|
||||
AUTOMATIC;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
switch (this) {
|
||||
case NONE: return "NONE";
|
||||
case DEFAULT: return DefaultCodec.class.getName();
|
||||
case BZIP: return BZip2Codec.class.getName();
|
||||
case GZIP: return GzipCodec.class.getName();
|
||||
case LZ4: return Lz4Codec.class.getName();
|
||||
case SNAPPY: return SnappyCodec.class.getName();
|
||||
case AUTOMATIC: return "Automatically Detected";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// properties
|
||||
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
|
||||
|
@ -100,7 +68,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
|
||||
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
|
||||
.required(false)
|
||||
.addValidator(createMultipleFilesExistValidator())
|
||||
.addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
||||
|
@ -135,6 +103,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.dynamicallyModifiesClasspath(true)
|
||||
.build();
|
||||
|
||||
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
|
||||
|
||||
private static final Object RESOURCES_LOCK = new Object();
|
||||
|
||||
private long kerberosReloginThreshold;
|
||||
|
@ -360,34 +330,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Validates that one or more files exist, as specified in a single property.
|
||||
*/
|
||||
public static final Validator createMultipleFilesExistValidator() {
|
||||
return new Validator() {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
final String[] files = input.split(",");
|
||||
for (String filename : files) {
|
||||
try {
|
||||
final File file = new File(filename.trim());
|
||||
final boolean valid = file.exists() && file.isFile();
|
||||
if (!valid) {
|
||||
final String message = "File " + file + " does not exist or is not a file";
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||
}
|
||||
} catch (SecurityException e) {
|
||||
final String message = "Unable to access " + filename + " due to " + e.getMessage();
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||
}
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured CompressionCodec, or null if none is configured.
|
||||
*
|
||||
|
@ -482,7 +424,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
|
||||
}
|
||||
|
||||
|
||||
static protected class HdfsResources {
|
||||
private final Configuration configuration;
|
||||
private final FileSystem fileSystem;
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.io.compress.Lz4Codec;
|
||||
import org.apache.hadoop.io.compress.SnappyCodec;
|
||||
|
||||
/**
|
||||
* Compression Type Enum for Hadoop related processors.
|
||||
*/
|
||||
public enum CompressionType {
|
||||
NONE,
|
||||
DEFAULT,
|
||||
BZIP,
|
||||
GZIP,
|
||||
LZ4,
|
||||
SNAPPY,
|
||||
AUTOMATIC;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
switch (this) {
|
||||
case NONE: return "NONE";
|
||||
case DEFAULT: return DefaultCodec.class.getName();
|
||||
case BZIP: return BZip2Codec.class.getName();
|
||||
case GZIP: return GzipCodec.class.getName();
|
||||
case LZ4: return Lz4Codec.class.getName();
|
||||
case SNAPPY: return SnappyCodec.class.getName();
|
||||
case AUTOMATIC: return "Automatically Detected";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* Validators for Hadoop related processors.
|
||||
*/
|
||||
public interface HadoopValidators {
|
||||
|
||||
/*
|
||||
* Validates that one or more files exist, as specified in a single property.
|
||||
*/
|
||||
Validator ONE_OR_MORE_FILE_EXISTS_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
final String[] files = input.split(",");
|
||||
for (String filename : files) {
|
||||
try {
|
||||
final File file = new File(filename.trim());
|
||||
final boolean valid = file.exists() && file.isFile();
|
||||
if (!valid) {
|
||||
final String message = "File " + file + " does not exist or is not a file";
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||
}
|
||||
} catch (SecurityException e) {
|
||||
final String message = "Unable to access " + filename + " due to " + e.getMessage();
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
|
||||
}
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/*
|
||||
* Validates that a property is a valid umask, i.e. a short octal number that is not negative.
|
||||
*/
|
||||
Validator UMASK_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
|
||||
String reason = null;
|
||||
try {
|
||||
final short shortVal = Short.parseShort(value, 8);
|
||||
if (shortVal < 0) {
|
||||
reason = "octal umask [" + value + "] cannot be negative";
|
||||
} else if (shortVal > 511) {
|
||||
// HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked
|
||||
reason = "octal umask [" + value + "] is not a valid umask";
|
||||
}
|
||||
} catch (final NumberFormatException e) {
|
||||
reason = "[" + value + "] is not a valid short octal number";
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* Validates that a property is a valid short number greater than 0.
|
||||
*/
|
||||
Validator POSITIVE_SHORT_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
|
||||
String reason = null;
|
||||
try {
|
||||
final short shortVal = Short.parseShort(value);
|
||||
if (shortVal <= 0) {
|
||||
reason = "short integer must be greater than zero";
|
||||
}
|
||||
} catch (final NumberFormatException e) {
|
||||
reason = "[" + value + "] is not a valid short integer";
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-commons</artifactId>
|
||||
<artifactId>nifi-extension-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
|
@ -34,7 +34,6 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -48,6 +47,7 @@
|
|||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<!-- Other modules using nifi-processor-utils are expected to have this API available, typically through a NAR dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
|
@ -0,0 +1,45 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-avro-record-utils</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-record-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
<!-- Other modules using nifi-avro-record-utils are expected to have these APIs available, typically through a NAR dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,504 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.avro;
|
||||
|
||||
import org.apache.avro.LogicalType;
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.util.Utf8;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AvroTypeUtil {
|
||||
public static final String AVRO_SCHEMA_FORMAT = "avro";
|
||||
|
||||
public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
|
||||
if (recordSchema == null) {
|
||||
throw new IllegalArgumentException("RecordSchema cannot be null");
|
||||
}
|
||||
|
||||
final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
|
||||
if (!schemaFormatOption.isPresent()) {
|
||||
throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema");
|
||||
}
|
||||
|
||||
final String schemaFormat = schemaFormatOption.get();
|
||||
if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
|
||||
throw new SchemaNotFoundException("Schema provided is not in Avro format");
|
||||
}
|
||||
|
||||
final Optional<String> textOption = recordSchema.getSchemaText();
|
||||
if (!textOption.isPresent()) {
|
||||
throw new SchemaNotFoundException("No Schema text was present in the RecordSchema");
|
||||
}
|
||||
|
||||
final String text = textOption.get();
|
||||
return new Schema.Parser().parse(text);
|
||||
}
|
||||
|
||||
public static DataType determineDataType(final Schema avroSchema) {
|
||||
final Type avroType = avroSchema.getType();
|
||||
|
||||
switch (avroType) {
|
||||
case BYTES:
|
||||
case FIXED:
|
||||
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
|
||||
case ARRAY:
|
||||
final DataType elementType = determineDataType(avroSchema.getElementType());
|
||||
return RecordFieldType.ARRAY.getArrayDataType(elementType);
|
||||
case BOOLEAN:
|
||||
return RecordFieldType.BOOLEAN.getDataType();
|
||||
case DOUBLE:
|
||||
return RecordFieldType.DOUBLE.getDataType();
|
||||
case ENUM:
|
||||
case STRING:
|
||||
return RecordFieldType.STRING.getDataType();
|
||||
case FLOAT:
|
||||
return RecordFieldType.FLOAT.getDataType();
|
||||
case INT: {
|
||||
final LogicalType logicalType = avroSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return RecordFieldType.INT.getDataType();
|
||||
}
|
||||
|
||||
if (LogicalTypes.date().getName().equals(logicalType.getName())) {
|
||||
return RecordFieldType.DATE.getDataType();
|
||||
} else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
|
||||
return RecordFieldType.TIME.getDataType();
|
||||
}
|
||||
|
||||
return RecordFieldType.INT.getDataType();
|
||||
}
|
||||
case LONG: {
|
||||
final LogicalType logicalType = avroSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return RecordFieldType.LONG.getDataType();
|
||||
}
|
||||
|
||||
if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
|
||||
return RecordFieldType.TIMESTAMP.getDataType();
|
||||
} else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
|
||||
return RecordFieldType.TIMESTAMP.getDataType();
|
||||
} else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
|
||||
return RecordFieldType.TIME.getDataType();
|
||||
}
|
||||
|
||||
return RecordFieldType.LONG.getDataType();
|
||||
}
|
||||
case RECORD: {
|
||||
final List<Field> avroFields = avroSchema.getFields();
|
||||
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
|
||||
|
||||
for (final Field field : avroFields) {
|
||||
final String fieldName = field.name();
|
||||
final Schema fieldSchema = field.schema();
|
||||
final DataType fieldType = determineDataType(fieldSchema);
|
||||
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
|
||||
}
|
||||
|
||||
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
|
||||
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
|
||||
}
|
||||
case NULL:
|
||||
return RecordFieldType.STRING.getDataType();
|
||||
case MAP:
|
||||
final Schema valueSchema = avroSchema.getValueType();
|
||||
final DataType valueType = determineDataType(valueSchema);
|
||||
return RecordFieldType.MAP.getMapDataType(valueType);
|
||||
case UNION: {
|
||||
final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
|
||||
.filter(s -> s.getType() != Type.NULL)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (nonNullSubSchemas.size() == 1) {
|
||||
return determineDataType(nonNullSubSchemas.get(0));
|
||||
}
|
||||
|
||||
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
|
||||
for (final Schema subSchema : nonNullSubSchemas) {
|
||||
final DataType childDataType = determineDataType(subSchema);
|
||||
possibleChildTypes.add(childDataType);
|
||||
}
|
||||
|
||||
return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static RecordSchema createSchema(final Schema avroSchema) {
|
||||
if (avroSchema == null) {
|
||||
throw new IllegalArgumentException("Avro Schema cannot be null");
|
||||
}
|
||||
|
||||
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
|
||||
for (final Field field : avroSchema.getFields()) {
|
||||
final String fieldName = field.name();
|
||||
final DataType dataType = AvroTypeUtil.determineDataType(field.schema());
|
||||
|
||||
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
|
||||
}
|
||||
|
||||
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
|
||||
return recordSchema;
|
||||
}
|
||||
|
||||
public static Object[] convertByteArray(final byte[] bytes) {
|
||||
final Object[] array = new Object[bytes.length];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
array[i] = Byte.valueOf(bytes[i]);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
public static ByteBuffer convertByteArray(final Object[] bytes) {
|
||||
final ByteBuffer bb = ByteBuffer.allocate(bytes.length);
|
||||
for (final Object o : bytes) {
|
||||
if (o instanceof Byte) {
|
||||
bb.put(((Byte) o).byteValue());
|
||||
} else {
|
||||
throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer");
|
||||
}
|
||||
}
|
||||
bb.flip();
|
||||
return bb;
|
||||
}
|
||||
|
||||
public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException {
|
||||
final GenericRecord rec = new GenericData.Record(avroSchema);
|
||||
final RecordSchema recordSchema = record.getSchema();
|
||||
|
||||
for (final RecordField recordField : recordSchema.getFields()) {
|
||||
final Object rawValue = record.getValue(recordField);
|
||||
final String fieldName = recordField.getFieldName();
|
||||
|
||||
final Field field = avroSchema.getField(fieldName);
|
||||
if (field == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName);
|
||||
rec.put(fieldName, converted);
|
||||
}
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) throws IOException {
|
||||
if (rawValue == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (fieldSchema.getType()) {
|
||||
case INT: {
|
||||
final LogicalType logicalType = fieldSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return DataTypeUtils.toInteger(rawValue, fieldName);
|
||||
}
|
||||
|
||||
if (LogicalTypes.date().getName().equals(logicalType.getName())) {
|
||||
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
|
||||
final Date date = new Date(longValue);
|
||||
final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant());
|
||||
final long days = duration.toDays();
|
||||
return (int) days;
|
||||
} else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
|
||||
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
|
||||
final Date date = new Date(longValue);
|
||||
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
|
||||
final long millisSinceMidnight = duration.toMillis();
|
||||
return (int) millisSinceMidnight;
|
||||
}
|
||||
|
||||
return DataTypeUtils.toInteger(rawValue, fieldName);
|
||||
}
|
||||
case LONG: {
|
||||
final LogicalType logicalType = fieldSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return DataTypeUtils.toLong(rawValue, fieldName);
|
||||
}
|
||||
|
||||
if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
|
||||
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
|
||||
final Date date = new Date(longValue);
|
||||
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
|
||||
return duration.toMillis() * 1000L;
|
||||
} else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
|
||||
return DataTypeUtils.toLong(rawValue, fieldName);
|
||||
} else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
|
||||
return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
|
||||
}
|
||||
|
||||
return DataTypeUtils.toLong(rawValue, fieldName);
|
||||
}
|
||||
case BYTES:
|
||||
case FIXED:
|
||||
if (rawValue instanceof byte[]) {
|
||||
return ByteBuffer.wrap((byte[]) rawValue);
|
||||
}
|
||||
if (rawValue instanceof Object[]) {
|
||||
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
|
||||
} else {
|
||||
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
|
||||
}
|
||||
case MAP:
|
||||
if (rawValue instanceof Record) {
|
||||
final Record recordValue = (Record) rawValue;
|
||||
final Map<String, Object> map = new HashMap<>();
|
||||
for (final RecordField recordField : recordValue.getSchema().getFields()) {
|
||||
final Object v = recordValue.getValue(recordField);
|
||||
if (v != null) {
|
||||
map.put(recordField.getFieldName(), v);
|
||||
}
|
||||
}
|
||||
|
||||
return map;
|
||||
} else {
|
||||
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
|
||||
}
|
||||
case RECORD:
|
||||
final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
|
||||
|
||||
final Record record = (Record) rawValue;
|
||||
for (final RecordField recordField : record.getSchema().getFields()) {
|
||||
final Object recordFieldValue = record.getValue(recordField);
|
||||
final String recordFieldName = recordField.getFieldName();
|
||||
|
||||
final Field field = fieldSchema.getField(recordFieldName);
|
||||
if (field == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName);
|
||||
avroRecord.put(recordFieldName, converted);
|
||||
}
|
||||
return avroRecord;
|
||||
case UNION:
|
||||
List<Schema> unionFieldSchemas = fieldSchema.getTypes();
|
||||
if (unionFieldSchemas != null) {
|
||||
// Ignore null types in union
|
||||
final List<Schema> nonNullFieldSchemas = unionFieldSchemas.stream()
|
||||
.filter(s -> s.getType() != Type.NULL)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// If at least one non-null type exists, find the first compatible type
|
||||
if (nonNullFieldSchemas.size() >= 1) {
|
||||
for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
|
||||
final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName);
|
||||
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
|
||||
if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType)) {
|
||||
return avroObject;
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass()
|
||||
+ " because no compatible types exist in the UNION");
|
||||
}
|
||||
}
|
||||
return null;
|
||||
case ARRAY:
|
||||
final Object[] objectArray = (Object[]) rawValue;
|
||||
final List<Object> list = new ArrayList<>(objectArray.length);
|
||||
for (final Object o : objectArray) {
|
||||
final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName);
|
||||
list.add(converted);
|
||||
}
|
||||
return list;
|
||||
case BOOLEAN:
|
||||
return DataTypeUtils.toBoolean(rawValue, fieldName);
|
||||
case DOUBLE:
|
||||
return DataTypeUtils.toDouble(rawValue, fieldName);
|
||||
case FLOAT:
|
||||
return DataTypeUtils.toFloat(rawValue, fieldName);
|
||||
case NULL:
|
||||
return null;
|
||||
case ENUM:
|
||||
return new GenericData.EnumSymbol(fieldSchema, rawValue);
|
||||
case STRING:
|
||||
return DataTypeUtils.toString(rawValue, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
|
||||
}
|
||||
|
||||
return rawValue;
|
||||
}
|
||||
|
||||
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
|
||||
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
|
||||
|
||||
for (final RecordField recordField : recordSchema.getFields()) {
|
||||
Object value = avroRecord.get(recordField.getFieldName());
|
||||
if (value == null) {
|
||||
for (final String alias : recordField.getAliases()) {
|
||||
value = avroRecord.get(alias);
|
||||
if (value != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String fieldName = recordField.getFieldName();
|
||||
final Field avroField = avroRecord.getSchema().getField(fieldName);
|
||||
if (avroField == null) {
|
||||
values.put(fieldName, null);
|
||||
continue;
|
||||
}
|
||||
|
||||
final Schema fieldSchema = avroField.schema();
|
||||
final Object rawValue = normalizeValue(value, fieldSchema);
|
||||
|
||||
final DataType desiredType = recordField.getDataType();
|
||||
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
|
||||
|
||||
values.put(fieldName, coercedValue);
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
private static Object normalizeValue(final Object value, final Schema avroSchema) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (avroSchema.getType()) {
|
||||
case INT: {
|
||||
final LogicalType logicalType = avroSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
final String logicalName = logicalType.getName();
|
||||
if (LogicalTypes.date().getName().equals(logicalName)) {
|
||||
// date logical name means that the value is number of days since Jan 1, 1970
|
||||
return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
|
||||
} else if (LogicalTypes.timeMillis().equals(logicalName)) {
|
||||
// time-millis logical name means that the value is number of milliseconds since midnight.
|
||||
return new java.sql.Time((int) value);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case LONG: {
|
||||
final LogicalType logicalType = avroSchema.getLogicalType();
|
||||
if (logicalType == null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
final String logicalName = logicalType.getName();
|
||||
if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
|
||||
return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
} else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) {
|
||||
return new java.sql.Timestamp((long) value);
|
||||
} else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) {
|
||||
return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case UNION:
|
||||
if (value instanceof GenericData.Record) {
|
||||
final GenericData.Record avroRecord = (GenericData.Record) value;
|
||||
return normalizeValue(value, avroRecord.getSchema());
|
||||
}
|
||||
break;
|
||||
case RECORD:
|
||||
final GenericData.Record record = (GenericData.Record) value;
|
||||
final Schema recordSchema = record.getSchema();
|
||||
final List<Field> recordFields = recordSchema.getFields();
|
||||
final Map<String, Object> values = new HashMap<>(recordFields.size());
|
||||
for (final Field field : recordFields) {
|
||||
final Object avroFieldValue = record.get(field.name());
|
||||
final Object fieldValue = normalizeValue(avroFieldValue, field.schema());
|
||||
values.put(field.name(), fieldValue);
|
||||
}
|
||||
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
|
||||
return new MapRecord(childSchema, values);
|
||||
case BYTES:
|
||||
final ByteBuffer bb = (ByteBuffer) value;
|
||||
return AvroTypeUtil.convertByteArray(bb.array());
|
||||
case FIXED:
|
||||
final GenericFixed fixed = (GenericFixed) value;
|
||||
return AvroTypeUtil.convertByteArray(fixed.bytes());
|
||||
case ENUM:
|
||||
return value.toString();
|
||||
case NULL:
|
||||
return null;
|
||||
case STRING:
|
||||
return value.toString();
|
||||
case ARRAY:
|
||||
final GenericData.Array<?> array = (GenericData.Array<?>) value;
|
||||
final Object[] valueArray = new Object[array.size()];
|
||||
for (int i = 0; i < array.size(); i++) {
|
||||
final Schema elementSchema = avroSchema.getElementType();
|
||||
valueArray[i] = normalizeValue(array.get(i), elementSchema);
|
||||
}
|
||||
return valueArray;
|
||||
case MAP:
|
||||
final Map<?, ?> avroMap = (Map<?, ?>) value;
|
||||
final Map<String, Object> map = new HashMap<>(avroMap.size());
|
||||
for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
|
||||
Object obj = entry.getValue();
|
||||
if (obj instanceof Utf8 || obj instanceof CharSequence) {
|
||||
obj = obj.toString();
|
||||
}
|
||||
|
||||
final String key = entry.getKey().toString();
|
||||
obj = normalizeValue(obj, avroSchema.getValueType());
|
||||
|
||||
map.put(key, obj);
|
||||
}
|
||||
|
||||
final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
|
||||
final List<RecordField> mapFields = new ArrayList<>();
|
||||
for (final String key : map.keySet()) {
|
||||
mapFields.add(new RecordField(key, elementType));
|
||||
}
|
||||
final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
|
||||
return new MapRecord(mapSchema, map);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,19 +17,18 @@
|
|||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
|
||||
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
|
||||
|
||||
|
@ -41,7 +40,7 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException {
|
||||
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
|
||||
final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (schemaText == null || schemaText.trim().isEmpty()) {
|
||||
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.avro.AvroSchemaValidator;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class SchemaAccessUtils {
|
||||
|
||||
public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
|
||||
"The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
|
||||
public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
|
||||
"The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
|
||||
+ "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
|
||||
public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
|
||||
"The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
|
||||
+ "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
|
||||
+ "found at https://github.com/hortonworks/registry");
|
||||
public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
|
||||
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
|
||||
.name("schema-registry")
|
||||
.displayName("Schema Registry")
|
||||
.description("Specifies the Controller Service to use for the Schema Registry")
|
||||
.identifiesControllerService(SchemaRegistry.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("schema-access-strategy")
|
||||
.displayName("Schema Access Strategy")
|
||||
.description("Specifies how to obtain the schema that is to be used for interpreting the data.")
|
||||
.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
|
||||
.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
|
||||
.name("schema-name")
|
||||
.displayName("Schema Name")
|
||||
.description("Specifies the name of the schema to lookup in the Schema Registry property")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${schema.name}")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
|
||||
.name("schema-text")
|
||||
.displayName("Schema Text")
|
||||
.description("The text of an Avro-formatted Schema")
|
||||
.addValidator(new AvroSchemaValidator())
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${avro.schema}")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static Collection<ValidationResult> validateSchemaAccessStrategy(final ValidationContext validationContext, final String schemaAccessStrategyValue,
|
||||
final List<AllowableValue> schemaAccessStrategyValues) {
|
||||
if (isSchemaRegistryRequired(schemaAccessStrategyValue)) {
|
||||
final boolean registrySet = validationContext.getProperty(SCHEMA_REGISTRY).isSet();
|
||||
if (!registrySet) {
|
||||
final String schemaAccessStrategyName = getSchemaAccessStrategyName(schemaAccessStrategyValue, schemaAccessStrategyValues);
|
||||
|
||||
return Collections.singleton(new ValidationResult.Builder()
|
||||
.subject("Schema Registry")
|
||||
.explanation("The '" + schemaAccessStrategyName + "' Schema Access Strategy requires that the Schema Registry property be set.")
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
private static String getSchemaAccessStrategyName(final String schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) {
|
||||
for (final AllowableValue allowableValue : schemaAccessStrategyValues) {
|
||||
if (allowableValue.getValue().equalsIgnoreCase(schemaAccessValue)) {
|
||||
return allowableValue.getDisplayName();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static boolean isSchemaRegistryRequired(final String schemaAccessValue) {
|
||||
return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
|
||||
|| HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
|
||||
}
|
||||
|
||||
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
|
||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
|
||||
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
|
||||
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
|
||||
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
|
||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
|
||||
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
|
||||
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
|
||||
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
|
||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
|
||||
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
|
||||
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
|
||||
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-hadoop-record-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-hadoop-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-avro-record-utils</artifactId>
|
||||
</dependency>
|
||||
<!-- Other modules using nifi-hadoop-utils are expected to have the below dependencies available, typically through a NAR dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,279 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.commons.io.input.NullInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSettings;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Base processor for reading a data from HDFS that can be fetched into records.
|
||||
*/
|
||||
@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
|
||||
@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
|
||||
public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
|
||||
|
||||
public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
|
||||
.name("filename")
|
||||
.displayName("Filename")
|
||||
.description("The name of the file to retrieve")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${path}/${filename}")
|
||||
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("record-writer")
|
||||
.displayName("Record Writer")
|
||||
.description("The service for writing records to the FlowFile content")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles will be routed to this relationship once they have been updated with the content of the file")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved and trying again will likely not be helpful. "
|
||||
+ "This would occur, for instance, if the file is not found or if there is a permissions issue")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder()
|
||||
.name("retry")
|
||||
.description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved, but might be able to be in the future if tried again. "
|
||||
+ "This generally indicates that the Fetch should be tried again.")
|
||||
.build();
|
||||
|
||||
public static final String FETCH_FAILURE_REASON_ATTR = "fetch.failure.reason";
|
||||
public static final String RECORD_COUNT_ATTR = "record.count";
|
||||
|
||||
private volatile Set<Relationship> fetchHdfsRecordRelationships;
|
||||
private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
|
||||
|
||||
@Override
|
||||
protected final void init(final ProcessorInitializationContext context) {
|
||||
super.init(context);
|
||||
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_SUCCESS);
|
||||
rels.add(REL_RETRY);
|
||||
rels.add(REL_FAILURE);
|
||||
this.fetchHdfsRecordRelationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> props = new ArrayList<>(properties);
|
||||
props.add(FILENAME);
|
||||
props.add(RECORD_WRITER);
|
||||
props.addAll(getAdditionalProperties());
|
||||
this.fetchHdfsRecordProperties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows sub-classes to add additional properties, called from initialize.
|
||||
*
|
||||
* @return additional properties to add to the overall list
|
||||
*/
|
||||
public List<PropertyDescriptor> getAdditionalProperties() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Set<Relationship> getRelationships() {
|
||||
return fetchHdfsRecordRelationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return fetchHdfsRecordProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sub-classes provide the appropriate HDFSRecordReader.
|
||||
*
|
||||
* @param context the process context to obtain additional configuration
|
||||
* @param flowFile the flow file being written
|
||||
* @param conf the Configuration instance
|
||||
* @param path the path to write to
|
||||
* @return the HDFSRecordWriter
|
||||
* @throws IOException if an error occurs creating the writer
|
||||
*/
|
||||
public abstract HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
|
||||
throws IOException;
|
||||
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
|
||||
final FileSystem fileSystem = getFileSystem();
|
||||
final Configuration configuration = getConfiguration();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
|
||||
if (configuration == null || fileSystem == null || ugi == null) {
|
||||
getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final FlowFile originalFlowFile = session.get();
|
||||
if (originalFlowFile == null ) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
ugi.doAs((PrivilegedAction<Object>)() -> {
|
||||
FlowFile child = null;
|
||||
final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
|
||||
try {
|
||||
final Path path = new Path(filenameValue);
|
||||
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
|
||||
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
|
||||
|
||||
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new NullInputStream(0));
|
||||
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
|
||||
// use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
|
||||
child = session.create(originalFlowFile);
|
||||
child = session.write(child, (final OutputStream rawOut) -> {
|
||||
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
|
||||
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
|
||||
|
||||
final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
|
||||
|
||||
final RecordSet recordSet = new RecordSet() {
|
||||
@Override
|
||||
public RecordSchema getSchema() throws IOException {
|
||||
return emptySchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Record next() throws IOException {
|
||||
return recordReader.nextRecord();
|
||||
}
|
||||
};
|
||||
|
||||
writeResult.set(recordSetWriter.write(recordSet, out));
|
||||
} catch (Exception e) {
|
||||
exceptionHolder.set(e);
|
||||
}
|
||||
});
|
||||
|
||||
stopWatch.stop();
|
||||
|
||||
// if any errors happened within the session.write then throw the exception so we jump
|
||||
// into one of the appropriate catch blocks below
|
||||
if (exceptionHolder.get() != null) {
|
||||
throw exceptionHolder.get();
|
||||
}
|
||||
|
||||
FlowFile successFlowFile = postProcess(context, session, child, path);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
|
||||
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
|
||||
successFlowFile = session.putAllAttributes(successFlowFile, attributes);
|
||||
|
||||
final URI uri = path.toUri();
|
||||
getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
|
||||
session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||
session.transfer(successFlowFile, REL_SUCCESS);
|
||||
session.remove(originalFlowFile);
|
||||
return null;
|
||||
|
||||
} catch (final FileNotFoundException | AccessControlException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
|
||||
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage());
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
} catch (final IOException | FlowFileAccessException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
|
||||
session.transfer(session.penalize(originalFlowFile), REL_RETRY);
|
||||
context.yield();
|
||||
} catch (final Throwable t) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
|
||||
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage());
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
}
|
||||
|
||||
// if we got this far then we weren't successful so we need to clean up the child flow file if it got initialized
|
||||
if (child != null) {
|
||||
session.remove(child);
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will be called after successfully writing to the destination file and renaming the file to it's final name
|
||||
* in order to give sub-classes a chance to take action before transferring to success.
|
||||
*
|
||||
* @param context the context
|
||||
* @param session the session
|
||||
* @param flowFile the flow file being processed
|
||||
* @param fetchPath the path that was fetched
|
||||
* @return an updated FlowFile reference
|
||||
*/
|
||||
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path fetchPath) {
|
||||
return flowFile;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,541 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||
import org.apache.nifi.annotation.configuration.DefaultSettings;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.hadoop.exception.FailureException;
|
||||
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
|
||||
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
|
||||
import org.apache.nifi.schema.access.SchemaAccessStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
|
||||
|
||||
/**
|
||||
* Base class for processors that write Records to HDFS.
|
||||
*/
|
||||
@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
|
||||
@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
|
||||
public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
||||
|
||||
|
||||
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("compression-type")
|
||||
.displayName("Compression Type")
|
||||
.description("The type of compression for the file being written.")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
|
||||
.name("overwrite")
|
||||
.displayName("Overwrite Files")
|
||||
.description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
|
||||
"flow files will be routed to failure when a file exists in the same directory with the same name.")
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
|
||||
.name("permissions-umask")
|
||||
.displayName("Permissions umask")
|
||||
.description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
|
||||
"This overrides the Hadoop Configuration dfs.umaskmode")
|
||||
.addValidator(HadoopValidators.UMASK_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
|
||||
.name("remote-owner")
|
||||
.displayName("Remote Owner")
|
||||
.description("Changes the owner of the HDFS file to this value after it is written. " +
|
||||
"This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
|
||||
.name("remote-group")
|
||||
.displayName("Remote Group")
|
||||
.description("Changes the group of the HDFS file to this value after it is written. " +
|
||||
"This only works if NiFi is running as a user that has HDFS super user privilege to change group")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||
.name("record-reader")
|
||||
.displayName("Record Reader")
|
||||
.description("The service for reading records from incoming flow files.")
|
||||
.identifiesControllerService(RecordReaderFactory.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Flow Files that have been successfully processed are transferred to this relationship")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder()
|
||||
.name("retry")
|
||||
.description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
|
||||
.build();
|
||||
|
||||
public static final String RECORD_COUNT_ATTR = "record.count";
|
||||
|
||||
private volatile String remoteOwner;
|
||||
private volatile String remoteGroup;
|
||||
private volatile SchemaAccessStrategy schemaAccessStrategy;
|
||||
|
||||
private volatile Set<Relationship> putHdfsRecordRelationships;
|
||||
private volatile List<PropertyDescriptor> putHdfsRecordProperties;
|
||||
|
||||
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
|
||||
SCHEMA_NAME_PROPERTY,
|
||||
SCHEMA_TEXT_PROPERTY,
|
||||
HWX_SCHEMA_REF_ATTRIBUTES,
|
||||
HWX_CONTENT_ENCODED_SCHEMA
|
||||
));
|
||||
|
||||
|
||||
@Override
|
||||
protected final void init(final ProcessorInitializationContext context) {
|
||||
super.init(context);
|
||||
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_SUCCESS);
|
||||
rels.add(REL_RETRY);
|
||||
rels.add(REL_FAILURE);
|
||||
this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> props = new ArrayList<>(properties);
|
||||
props.add(RECORD_READER);
|
||||
|
||||
props.add(new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(DIRECTORY)
|
||||
.description("The parent directory to which files should be written. Will be created if it doesn't exist.")
|
||||
.build());
|
||||
|
||||
final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
|
||||
|
||||
props.add(new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
|
||||
.description("Specifies how to obtain the schema that is to be used for writing the data.")
|
||||
.allowableValues(strategies)
|
||||
.defaultValue(getDefaultSchemaAccessStrategy().getValue())
|
||||
.build());
|
||||
|
||||
props.add(SCHEMA_REGISTRY);
|
||||
props.add(SCHEMA_NAME);
|
||||
props.add(SCHEMA_TEXT);
|
||||
|
||||
final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
|
||||
|
||||
props.add(new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(COMPRESSION_TYPE)
|
||||
.allowableValues(compressionTypes)
|
||||
.defaultValue(getDefaultCompressionType(context))
|
||||
.build());
|
||||
|
||||
props.add(OVERWRITE);
|
||||
props.add(UMASK);
|
||||
props.add(REMOTE_GROUP);
|
||||
props.add(REMOTE_OWNER);
|
||||
props.addAll(getAdditionalProperties());
|
||||
this.putHdfsRecordProperties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
protected List<AllowableValue> getSchemaAccessStrategyValues() {
|
||||
return strategyList;
|
||||
}
|
||||
|
||||
protected AllowableValue getDefaultSchemaAccessStrategy() {
|
||||
return SCHEMA_NAME_PROPERTY;
|
||||
}
|
||||
|
||||
private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
|
||||
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param context the initialization context
|
||||
* @return the possible compression types
|
||||
*/
|
||||
public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
|
||||
|
||||
/**
|
||||
* @param context the initialization context
|
||||
* @return the default compression type
|
||||
*/
|
||||
public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
|
||||
|
||||
/**
|
||||
* Allows sub-classes to add additional properties, called from initialize.
|
||||
*
|
||||
* @return additional properties to add to the overall list
|
||||
*/
|
||||
public List<PropertyDescriptor> getAdditionalProperties() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Set<Relationship> getRelationships() {
|
||||
return putHdfsRecordRelationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return putHdfsRecordProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||
super.abstractOnScheduled(context);
|
||||
|
||||
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
|
||||
|
||||
final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
|
||||
final String schemaAccess = context.getProperty(descriptor).getValue();
|
||||
this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
|
||||
|
||||
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
|
||||
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
|
||||
|
||||
// Set umask once, to avoid thread safety issues doing it in onTrigger
|
||||
final PropertyValue umaskProp = context.getProperty(UMASK);
|
||||
final short dfsUmask;
|
||||
if (umaskProp.isSet()) {
|
||||
dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
|
||||
} else {
|
||||
dfsUmask = FsPermission.DEFAULT_UMASK;
|
||||
}
|
||||
final Configuration conf = getConfiguration();
|
||||
FsPermission.setUMask(conf, new FsPermission(dfsUmask));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sub-classes provide the appropriate HDFSRecordWriter.
|
||||
*
|
||||
* @param context the process context to obtain additional configuration
|
||||
* @param flowFile the flow file being written
|
||||
* @param conf the Configuration instance
|
||||
* @param path the path to write to
|
||||
* @param schema the schema for writing
|
||||
* @return the HDFSRecordWriter
|
||||
* @throws IOException if an error occurs creating the writer or processing the schema
|
||||
*/
|
||||
public abstract HDFSRecordWriter createHDFSRecordWriter(
|
||||
final ProcessContext context,
|
||||
final FlowFile flowFile,
|
||||
final Configuration conf,
|
||||
final Path path,
|
||||
final RecordSchema schema) throws IOException, SchemaNotFoundException;
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
|
||||
final FileSystem fileSystem = getFileSystem();
|
||||
final Configuration configuration = getConfiguration();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
|
||||
if (configuration == null || fileSystem == null || ugi == null) {
|
||||
getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
ugi.doAs((PrivilegedAction<Object>)() -> {
|
||||
Path tempDotCopyFile = null;
|
||||
FlowFile putFlowFile = flowFile;
|
||||
try {
|
||||
final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
|
||||
final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
|
||||
|
||||
// create the directory if it doesn't exist
|
||||
final Path directoryPath = new Path(directoryValue);
|
||||
createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
|
||||
|
||||
// write to tempFile first and on success rename to destFile
|
||||
final Path tempFile = new Path(directoryPath, "." + filenameValue);
|
||||
final Path destFile = new Path(directoryPath, filenameValue);
|
||||
|
||||
final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
|
||||
final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
|
||||
|
||||
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
|
||||
if (destinationExists && !shouldOverwrite) {
|
||||
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
|
||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
|
||||
return null;
|
||||
}
|
||||
|
||||
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
|
||||
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
|
||||
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
|
||||
final FlowFile flowFileIn = putFlowFile;
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
|
||||
// Read records from the incoming FlowFile and write them the tempFile
|
||||
session.read(putFlowFile, (final InputStream rawIn) -> {
|
||||
RecordReader recordReader = null;
|
||||
HDFSRecordWriter recordWriter = null;
|
||||
|
||||
try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
|
||||
final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
|
||||
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
|
||||
|
||||
// if we fail to create the RecordReader then we want to route to failure, so we need to
|
||||
// handle this separately from the other IOExceptions which normally route to retry
|
||||
try {
|
||||
recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
|
||||
} catch (Exception e) {
|
||||
final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
|
||||
exceptionHolder.set(rrfe);
|
||||
return;
|
||||
}
|
||||
|
||||
final RecordSet recordSet = recordReader.createRecordSet();
|
||||
writeResult.set(recordWriter.write(recordSet));
|
||||
|
||||
} catch (Exception e) {
|
||||
exceptionHolder.set(e);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(recordReader);
|
||||
IOUtils.closeQuietly(recordWriter);
|
||||
}
|
||||
});
|
||||
stopWatch.stop();
|
||||
|
||||
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
tempDotCopyFile = tempFile;
|
||||
|
||||
// if any errors happened within the session.read then throw the exception so we jump
|
||||
// into one of the appropriate catch blocks below
|
||||
if (exceptionHolder.get() != null) {
|
||||
throw exceptionHolder.get();
|
||||
}
|
||||
|
||||
// Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
|
||||
rename(fileSystem, tempFile, destFile);
|
||||
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
|
||||
|
||||
getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
|
||||
|
||||
putFlowFile = postProcess(context, session, putFlowFile, destFile);
|
||||
|
||||
final String outputPath = destFile.toString();
|
||||
final String newFilename = destFile.getName();
|
||||
final String hdfsPath = destFile.getParent().toString();
|
||||
|
||||
// Update the filename and absolute path attributes
|
||||
final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
|
||||
attributes.put(CoreAttributes.FILENAME.key(), newFilename);
|
||||
attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
|
||||
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
|
||||
putFlowFile = session.putAllAttributes(putFlowFile, attributes);
|
||||
|
||||
// Send a provenance event and transfer to success
|
||||
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
|
||||
session.getProvenanceReporter().send(putFlowFile, transitUri);
|
||||
session.transfer(putFlowFile, REL_SUCCESS);
|
||||
|
||||
} catch (IOException | FlowFileAccessException e) {
|
||||
deleteQuietly(fileSystem, tempDotCopyFile);
|
||||
getLogger().error("Failed to write due to {}", new Object[]{e});
|
||||
session.transfer(session.penalize(putFlowFile), REL_RETRY);
|
||||
context.yield();
|
||||
} catch (Throwable t) {
|
||||
deleteQuietly(fileSystem, tempDotCopyFile);
|
||||
getLogger().error("Failed to write due to {}", new Object[]{t});
|
||||
session.transfer(putFlowFile, REL_FAILURE);
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will be called after successfully writing to the destination file and renaming the file to it's final name
|
||||
* in order to give sub-classes a chance to take action before transferring to success.
|
||||
*
|
||||
* @param context the context
|
||||
* @param session the session
|
||||
* @param flowFile the flow file being processed
|
||||
* @param destFile the destination file written to
|
||||
* @return an updated FlowFile reference
|
||||
*/
|
||||
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to rename srcFile to destFile up to 10 times, with a 200ms sleep in between each attempt.
|
||||
*
|
||||
* If the file has not been renamed after 10 attempts, a FailureException is thrown.
|
||||
*
|
||||
* @param fileSystem the file system where the files are located
|
||||
* @param srcFile the source file
|
||||
* @param destFile the destination file to rename the source to
|
||||
* @throws IOException if IOException happens while attempting to rename
|
||||
* @throws InterruptedException if renaming is interrupted
|
||||
* @throws FailureException if the file couldn't be renamed after 10 attempts
|
||||
*/
|
||||
protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException {
|
||||
boolean renamed = false;
|
||||
for (int i = 0; i < 10; i++) { // try to rename multiple times.
|
||||
if (fileSystem.rename(srcFile, destFile)) {
|
||||
renamed = true;
|
||||
break;// rename was successful
|
||||
}
|
||||
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
|
||||
}
|
||||
if (!renamed) {
|
||||
fileSystem.delete(srcFile, false);
|
||||
throw new FailureException("Could not rename file " + srcFile + " to its final filename");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the given file from the given filesystem. Any exceptions that are encountered will be caught and logged, but not thrown.
|
||||
*
|
||||
* @param fileSystem the filesystem where the file exists
|
||||
* @param file the file to delete
|
||||
*/
|
||||
protected void deleteQuietly(final FileSystem fileSystem, final Path file) {
|
||||
if (file != null) {
|
||||
try {
|
||||
fileSystem.delete(file, false);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to remove file {} due to {}", new Object[]{file, e});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Changes the ownership of the given file.
|
||||
*
|
||||
* @param fileSystem the filesystem where the file exists
|
||||
* @param path the file to change ownership on
|
||||
* @param remoteOwner the new owner for the file
|
||||
* @param remoteGroup the new group for the file
|
||||
*/
|
||||
protected void changeOwner(final FileSystem fileSystem, final Path path, final String remoteOwner, final String remoteGroup) {
|
||||
try {
|
||||
// Change owner and group of file if configured to do so
|
||||
if (remoteOwner != null || remoteGroup != null) {
|
||||
fileSystem.setOwner(path, remoteOwner, remoteGroup);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the given directory and changes the ownership to the specified owner/group.
|
||||
*
|
||||
* @param fileSystem the filesystem to create the directory on
|
||||
* @param directory the directory to create
|
||||
* @param remoteOwner the owner for changing ownership of the directory
|
||||
* @param remoteGroup the group for changing ownership of the directory
|
||||
* @throws IOException if an error occurs obtaining the file status or issuing the mkdir command
|
||||
* @throws FailureException if the directory could not be created
|
||||
*/
|
||||
protected void createDirectory(final FileSystem fileSystem, final Path directory, final String remoteOwner, final String remoteGroup) throws IOException, FailureException {
|
||||
try {
|
||||
if (!fileSystem.getFileStatus(directory).isDirectory()) {
|
||||
throw new FailureException(directory.toString() + " already exists and is not a directory");
|
||||
}
|
||||
} catch (FileNotFoundException fe) {
|
||||
if (!fileSystem.mkdirs(directory)) {
|
||||
throw new FailureException(directory.toString() + " could not be created");
|
||||
}
|
||||
changeOwner(fileSystem, directory, remoteOwner, remoteGroup);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop.exception;
|
||||
|
||||
/**
|
||||
* An exception to represent an error that occurred during a put to HDFS and should not be retried.
|
||||
*/
|
||||
public class FailureException extends Exception {
|
||||
|
||||
public FailureException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FailureException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop.exception;
|
||||
|
||||
/**
|
||||
* Thrown when a schema is unable to be parsed into the expected type.
|
||||
*/
|
||||
public class InvalidSchemaException extends FailureException {
|
||||
|
||||
public InvalidSchemaException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public InvalidSchemaException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop.exception;
|
||||
|
||||
/**
|
||||
* Thrown when an error is occurs while using the record reader factory to create a record reader.
|
||||
*/
|
||||
public class RecordReaderFactoryException extends FailureException {
|
||||
|
||||
public RecordReaderFactoryException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public RecordReaderFactoryException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop.record;
|
||||
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Reads Records from HDFS.
|
||||
*/
|
||||
public interface HDFSRecordReader extends Closeable {
|
||||
|
||||
Record nextRecord() throws IOException;
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop.record;
|
||||
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Writes Records to HDFS.
|
||||
*/
|
||||
public interface HDFSRecordWriter extends Closeable {
|
||||
|
||||
/**
|
||||
* @param record the record to write
|
||||
* @throws IOException if an I/O error happens writing the record
|
||||
*/
|
||||
void write(Record record) throws IOException;
|
||||
|
||||
/**
|
||||
* @param recordSet the RecordSet to write
|
||||
* @return the result of writing the record set
|
||||
* @throws IOException if an I/O error happens reading from the RecordSet, or writing a Record
|
||||
*/
|
||||
default WriteResult write(final RecordSet recordSet) throws IOException {
|
||||
int recordCount = 0;
|
||||
|
||||
Record record;
|
||||
while ((record = recordSet.next()) != null) {
|
||||
write(record);
|
||||
recordCount++;
|
||||
}
|
||||
|
||||
return WriteResult.of(recordCount, Collections.emptyMap());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-utils</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-mock-record-utils</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<!-- Other modules using nifi-standard-record-utils are expected to have this API available, typically through a NAR dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard.util.record;
|
||||
package org.apache.nifi.serialization.record;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue