NIFI-1868: Add PutHiveStreaming processor

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-07-21 11:59:41 -04:00 committed by Bryan Bende
parent cda4310ad8
commit c2019b9339
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
21 changed files with 2270 additions and 87 deletions

View File

@ -26,8 +26,8 @@
<packaging>jar</packaging>
<properties>
<hive.version>2.0.0</hive.version>
<orc.version>1.1.1</orc.version>
<hive.version>2.0.1</hive.version>
<orc.version>1.1.2</orc.version>
</properties>
@ -144,6 +144,30 @@
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>

View File

@ -30,13 +30,16 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.KerberosTicketRenewer;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.ValidationResources;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@ -74,7 +77,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
.required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
.required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("hive-db-user")
@ -116,10 +119,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
.sensitive(false)
.build();
static final long TICKET_RENEWAL_PERIOD = 60000;
private volatile UserGroupInformation ugi;
private volatile KerberosTicketRenewer renewer;
private static final long TICKET_RENEWAL_PERIOD = 60000;
private final static List<PropertyDescriptor> properties;
private static KerberosProperties kerberosProperties;
@ -131,6 +131,8 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
private volatile BasicDataSource dataSource;
private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
private volatile UserGroupInformation ugi;
static {
kerberosProperties = KerberosProperties.create(NiFiProperties.getInstance());
@ -160,22 +162,9 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
if (confFileProvided) {
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
// then load the Configuration and set the new resources in the holder
if (resources == null || !configFiles.equals(resources.getConfigResources())) {
getLogger().debug("Reloading validation resources");
resources = new ValidationResources(configFiles, HiveJdbcCommon.getConfigurationFromFiles(configFiles));
validationResourceHolder.set(resources);
}
final Configuration hiveConfig = resources.getConfiguration();
final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
this.getClass().getSimpleName(), hiveConfig, principal, keytab, getLogger()));
final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger()));
}
return problems;
@ -194,12 +183,14 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
* @throws InitializationException if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException, IOException {
public void onConfigured(final ConfigurationContext context) throws InitializationException {
connectionUrl = context.getProperty(DATABASE_URL).getValue();
ComponentLog log = getLogger();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
final Configuration hiveConfig = HiveJdbcCommon.getConfigurationFromFiles(configFiles);
final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
// add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
@ -214,15 +205,15 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
try {
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
}
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
// if we got here then we have a ugi so start a renewer
if (ugi != null) {
final String id = getClass().getSimpleName();
renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger());
}
}
final String user = context.getProperty(DB_USER).getValue();
final String passw = context.getProperty(DB_PASSWORD).getValue();
@ -248,9 +239,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
@OnDisabled
public void shutdown() {
if (renewer != null) {
renewer.stop();
}
hiveConfigurator.stopRenewer();
try {
dataSource.close();
@ -290,22 +279,4 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
return connectionUrl;
}
private static class ValidationResources {
private final String configResources;
private final Configuration configuration;
public ValidationResources(String configResources, Configuration configuration) {
this.configResources = configResources;
this.configuration = configuration;
}
public String getConfigResources() {
return configResources;
}
public Configuration getConfiguration() {
return configuration;
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.orc.OrcFlowFileWriter;
import org.apache.nifi.util.orc.OrcUtils;
import org.apache.orc.CompressionKind;
@ -98,7 +99,7 @@ public class ConvertAvroToORC extends AbstractProcessor {
.displayName("ORC Configuration Resources")
.description("A file or comma separated list of files which contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Please see the ORC documentation for more details.")
.required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
.required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
public static final PropertyDescriptor STRIPE_SIZE = new PropertyDescriptor.Builder()
.name("orc-stripe-size")

View File

@ -0,0 +1,657 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
*/
@Tags({"hive", "streaming", "put", "database", "store"})
@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
+ "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
+ "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
@WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
})
public class PutHiveStreaming extends AbstractProcessor {
// Attributes
public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
// Validators
private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
String reason = null;
try {
final int intVal = Integer.parseInt(value);
if (intVal < 2) {
reason = "value is less than 2";
}
} catch (final NumberFormatException e) {
reason = "value is not a valid integer";
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
};
// Properties
public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
.name("hive-stream-metastore-uri")
.displayName("Hive Metastore URI")
.description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+ "Hive metastore is 9043.")
.required(true)
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
.build();
public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("hive-config-resources")
.displayName("Hive Configuration Resources")
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
.required(false)
.addValidator(HiveUtils.createMultipleFilesExistValidator())
.build();
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
.name("hive-stream-database-name")
.displayName("Database Name")
.description("The name of the database in which to put the data.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hive-stream-table-name")
.displayName("Table Name")
.description("The name of the database table in which to put the data.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
.name("hive-stream-partition-cols")
.displayName("Partition Columns")
.description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
+ "correspond exactly to the order of partition columns specified during the table creation.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
.build();
public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
.name("hive-stream-autocreate-partition")
.displayName("Auto-Create Partitions")
.description("Flag indicating whether partitions should be automatically created")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
.name("hive-stream-max-open-connections")
.displayName("Max Open Connections")
.description("The maximum number of open connections that can be allocated from this pool at the same time, "
+ "or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
.name("hive-stream-heartbeat-interval")
.displayName("Heartbeat Interval")
.description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
+ "A value of 0 indicates that no heartbeat should be sent.")
.defaultValue("60")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
.name("hive-stream-transactions-per-batch")
.displayName("Transactions per Batch")
.description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(GREATER_THAN_ONE_VALIDATOR)
.defaultValue("100")
.build();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the database is successfully updated")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private static final long TICKET_RENEWAL_PERIOD = 60000;
protected KerberosProperties kerberosProperties;
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
protected volatile UserGroupInformation ugi;
protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
protected HiveOptions options;
protected ExecutorService callTimeoutPool;
protected transient Timer heartBeatTimer;
protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
protected Map<HiveEndPoint, HiveWriter> allWriters;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
propertyDescriptors = new ArrayList<>();
propertyDescriptors.add(METASTORE_URI);
propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
propertyDescriptors.add(DB_NAME);
propertyDescriptors.add(TABLE_NAME);
propertyDescriptors.add(PARTITION_COLUMNS);
propertyDescriptors.add(AUTOCREATE_PARTITIONS);
propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
propertyDescriptors.add(HEARTBEAT_INTERVAL);
propertyDescriptors.add(TXNS_PER_BATCH);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
_relationships.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected void init(ProcessorInitializationContext context) {
kerberosProperties = getKerberosProperties();
propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void setup(final ProcessContext context) {
ComponentLog log = getLogger();
final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
final String dbName = context.getProperty(DB_NAME).getValue();
final String tableName = context.getProperty(TABLE_NAME).getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
// add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
hiveConfig.set(descriptor.getName(), entry.getValue());
}
}
options = new HiveOptions(metastoreUri, dbName, tableName)
.withTxnsPerBatch(txnsPerBatch)
.withAutoCreatePartitions(autoCreatePartitions)
.withMaxOpenConnections(maxConnections)
.withHeartBeatInterval(heartbeatInterval);
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
try {
ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
} catch (AuthenticationFailedException ae) {
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
}
log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
}
allWriters = new ConcurrentHashMap<>();
String timeoutName = "put-hive-streaming-%d";
this.callTimeoutPool = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
sendHeartBeat.set(true);
heartBeatTimer = new Timer();
setupHeartBeatTimer();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog log = getLogger();
try {
final List<String> partitionColumnList;
String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
if (StringUtils.isEmpty(partitionColumns)) {
partitionColumnList = Collections.emptyList();
} else {
String[] partitionCols = partitionColumns.split(",");
partitionColumnList = new ArrayList<>(partitionCols.length);
for (String col : partitionCols) {
partitionColumnList.add(col.trim());
}
}
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
int recordCount = 0;
final List<HiveStreamingRecord> records = new LinkedList<>();
session.read(flowFile, in -> {
try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
GenericRecord currRecord;
while (reader.hasNext()) {
currRecord = reader.next();
List<String> partitionValues = new ArrayList<>();
for (String partition : partitionColumnList) {
Object partitionValue = currRecord.get(partition);
if (partitionValue == null) {
throw new IOException("Partition column '" + partition + "' not found in Avro record");
}
partitionValues.add(partitionValue.toString());
}
List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) {
JSONObject obj = new JSONObject();
for (Schema.Field field : fields) {
String fieldName = field.name();
// Skip fields that are partition columns, we extracted those values above to create an EndPoint
if (!partitionColumnList.contains(fieldName)) {
Object value = currRecord.get(fieldName);
try {
obj.put(fieldName, value);
} catch (JSONException je) {
throw new IOException(je);
}
}
}
records.add(new HiveStreamingRecord(partitionValues, obj));
}
}
}
});
// Write all records to Hive Streaming
for (HiveStreamingRecord record : records) {
HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
HiveWriter writer = getOrCreateWriter(endPoint);
writer.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
recordCount++;
}
flowFile = session.putAttribute(flowFile, HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount));
flushAllWriters(true);
session.getProvenanceReporter().send(flowFile, options.getMetaStoreURI());
session.transfer(flowFile, REL_SUCCESS);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader);
} catch (HiveWriter.CommitFailure commitFailure) {
log.error("Error committing to Hive", commitFailure);
session.transfer(flowFile, REL_FAILURE);
} catch (HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure txnFailure) {
log.error("Hive Streaming Transaction Failure", txnFailure);
session.transfer(flowFile, REL_FAILURE);
} catch (InterruptedException e) {
log.error("Hive Streaming Interrupted, flow file will be penalized and routed to retry", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (ConnectionError | HiveWriter.ConnectFailure ce) {
log.error("Error while connecting via Hive Streaming, flow file will be penalized and routed to retry", ce);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (SerializationError se) {
log.error("Serialization exception occurred, record not written to Hive.", se);
session.transfer(flowFile, REL_FAILURE);
} catch (HiveWriter.WriteFailure wf) {
log.error("Error while writing record to Hive Streaming", wf);
abortAndCloseWriters();
session.transfer(flowFile, REL_FAILURE);
}
}
@OnStopped
public void cleanup() {
ComponentLog log = getLogger();
sendHeartBeat.set(false);
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
HiveWriter w = entry.getValue();
w.flushAndClose();
} catch (Exception ex) {
log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex);
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
callTimeoutPool.shutdown();
try {
while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination(
options.getCallTimeOut(), TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t);
}
callTimeoutPool = null;
}
private void setupHeartBeatTimer() {
if (options.getHeartBeatInterval() > 0) {
final ComponentLog log = getLogger();
heartBeatTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (sendHeartBeat.get()) {
log.debug("Start sending heartbeat on all writers");
sendHeartBeatOnAllWriters();
setupHeartBeatTimer();
}
} catch (Exception e) {
log.warn("Failed to heartbeat on HiveWriter ", e);
}
}
}, options.getHeartBeatInterval() * 1000);
}
}
private void sendHeartBeatOnAllWriters() throws InterruptedException {
for (HiveWriter writer : allWriters.values()) {
writer.heartBeat();
}
}
private void flushAllWriters(boolean rollToNext)
throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
for (HiveWriter writer : allWriters.values()) {
writer.flush(rollToNext);
}
}
private void abortAndCloseWriters() {
try {
abortAllWriters();
closeAllWriters();
} catch (Exception ie) {
getLogger().warn("unable to close hive connections. ", ie);
}
}
/**
* Abort current Txn on all writers
*/
private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
entry.getValue().abort();
} catch (Exception e) {
getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
}
}
}
/**
* Closes all writers and remove them from cache
*/
private void closeAllWriters() {
//1) Retire writers
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
entry.getValue().close();
} catch (Exception e) {
getLogger().warn("unable to close writers. ", e);
}
}
//2) Clear cache
allWriters.clear();
}
private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
ComponentLog log = getLogger();
try {
HiveWriter writer = allWriters.get(endPoint);
if (writer == null) {
log.debug("Creating Writer to Hive end point : " + endPoint);
writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{allWriters.size(), options.getMaxOpenConnections()});
int retired = retireIdleWriters();
if (retired == 0) {
retireEldestWriter();
}
}
allWriters.put(endPoint, writer);
HiveUtils.logAllHiveEndPoints(allWriters);
}
return writer;
} catch (HiveWriter.ConnectFailure e) {
log.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
throw e;
}
}
/**
* Locate writer that has not been used for longest time and retire it
*/
private void retireEldestWriter() {
ComponentLog log = getLogger();
log.info("Attempting close eldest writers");
long oldestTimeStamp = System.currentTimeMillis();
HiveEndPoint eldest = null;
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
if (entry.getValue().getLastUsed() < oldestTimeStamp) {
eldest = entry.getKey();
oldestTimeStamp = entry.getValue().getLastUsed();
}
}
try {
log.info("Closing least used Writer to Hive end point : " + eldest);
allWriters.remove(eldest).flushAndClose();
} catch (IOException e) {
log.warn("Failed to close writer for end point: " + eldest, e);
} catch (InterruptedException e) {
log.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
}
}
/**
* Locate all writers past idle timeout and retire them
*
* @return number of writers retired
*/
private int retireIdleWriters() {
ComponentLog log = getLogger();
log.info("Attempting to close idle HiveWriters");
int count = 0;
long now = System.currentTimeMillis();
ArrayList<HiveEndPoint> retirees = new ArrayList<>();
//1) Find retirement candidates
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
++count;
retirees.add(entry.getKey());
}
}
//2) Retire them
for (HiveEndPoint ep : retirees) {
try {
log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep});
allWriters.remove(ep).flushAndClose();
} catch (IOException e) {
log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, e);
} catch (InterruptedException e) {
log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, e);
}
}
return count;
}
protected HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions options) throws ConnectionError {
return HiveUtils.makeEndPoint(partitionValues, options);
}
protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
throws HiveWriter.ConnectFailure, InterruptedException {
return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
}
protected KerberosProperties getKerberosProperties() {
return KerberosProperties.create(NiFiProperties.getInstance());
}
protected class HiveStreamingRecord {
private List<String> partitionValues;
private JSONObject record;
public HiveStreamingRecord(List<String> partitionValues, JSONObject record) {
this.partitionValues = partitionValues;
this.record = record;
}
public List<String> getPartitionValues() {
return partitionValues;
}
public JSONObject getRecord() {
return record;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
/**
* Created by mburgess on 5/4/16.
*/
public class AuthenticationFailedException extends Exception {
public AuthenticationFailedException(String reason, Exception cause) {
super(reason, cause);
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.KerberosTicketRenewer;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by mburgess on 5/4/16.
*/
public class HiveConfigurator {
private volatile KerberosTicketRenewer renewer;
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
final List<ValidationResult> problems = new ArrayList<>();
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
// then load the Configuration and set the new resources in the holder
if (resources == null || !configFiles.equals(resources.getConfigResources())) {
log.debug("Reloading validation resources");
resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles));
validationResourceHolder.set(resources);
}
final Configuration hiveConfig = resources.getConfiguration();
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, log));
return problems;
}
public Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hiveConfig = new HiveConf();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
hiveConfig.addResource(new Path(configFile.trim()));
}
}
return hiveConfig;
}
public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod, ComponentLog log) throws AuthenticationFailedException {
UserGroupInformation ugi;
try {
ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
} catch (IOException ioe) {
throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
}
// if we got here then we have a ugi so start a renewer
if (ugi != null) {
final String id = getClass().getSimpleName();
renewer = SecurityUtil.startTicketRenewalThread(id, ugi, ticketRenewalPeriod, log);
}
return ugi;
}
public void stopRenewer() {
if (renewer != null) {
renewer.stop();
}
}
}

View File

@ -29,11 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
@ -347,34 +343,6 @@ public class HiveJdbcCommon {
void processRow(ResultSet resultSet) throws IOException;
}
/**
* Validates that one or more files exist, as specified in a single property.
*/
public static Validator createMultipleFilesExistValidator() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String[] files = input.split(",");
for (String filename : files) {
try {
final File file = new File(filename.trim());
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not exist or is not a file";
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename + " due to " + e.getMessage();
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
public static Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hiveConfig = new HiveConf();
if (StringUtils.isNotBlank(configFiles)) {

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
import java.io.Serializable;
public class HiveOptions implements Serializable {
/**
* Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
*/
private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
protected String databaseName;
protected String tableName;
protected String metaStoreURI;
protected Integer txnsPerBatch = 100;
protected Integer maxOpenConnections = 10;
protected Integer batchSize = 15000;
protected Integer idleTimeout = 60000;
protected Integer callTimeout = 0;
protected Integer heartBeatInterval = 60;
protected Boolean autoCreatePartitions = true;
protected String kerberosPrincipal;
protected String kerberosKeytab;
protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
public HiveOptions(String metaStoreURI, String databaseName, String tableName) {
this.metaStoreURI = metaStoreURI;
this.databaseName = databaseName;
this.tableName = tableName;
}
public HiveOptions withTickTupleInterval(Integer tickInterval) {
this.tickTupleInterval = tickInterval;
return this;
}
public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
this.txnsPerBatch = txnsPerBatch;
return this;
}
public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
this.maxOpenConnections = maxOpenConnections;
return this;
}
public HiveOptions withBatchSize(Integer batchSize) {
this.batchSize = batchSize;
return this;
}
public HiveOptions withIdleTimeout(Integer idleTimeout) {
this.idleTimeout = idleTimeout;
return this;
}
public HiveOptions withCallTimeout(Integer callTimeout) {
this.callTimeout = callTimeout;
return this;
}
public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
return this;
}
public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
this.autoCreatePartitions = autoCreatePartitions;
return this;
}
public HiveOptions withKerberosKeytab(String kerberosKeytab) {
this.kerberosKeytab = kerberosKeytab;
return this;
}
public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
this.kerberosPrincipal = kerberosPrincipal;
return this;
}
public String getMetaStoreURI() {
return metaStoreURI;
}
public String getDatabaseName() {
return databaseName;
}
public String getTableName() {
return tableName;
}
public Integer getBatchSize() {
return batchSize;
}
public Integer getCallTimeOut() {
return callTimeout;
}
public Integer getHeartBeatInterval() {
return heartBeatInterval;
}
public Integer getMaxOpenConnections() {
return maxOpenConnections;
}
public Integer getIdleTimeout() {
return idleTimeout;
}
public Integer getTxnsPerBatch() {
return txnsPerBatch;
}
public Boolean getAutoCreatePartitions() {
return autoCreatePartitions;
}
public String getKerberosPrincipal() {
return kerberosPrincipal;
}
public String getKerberosKeytab() {
return kerberosKeytab;
}
public Integer getTickTupleInterval() {
return tickTupleInterval;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class HiveUtils {
private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
if(partitionVals==null) {
return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
}
return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
}
public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
throws HiveWriter.ConnectFailure, InterruptedException {
return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
options.getCallTimeOut(), callTimeoutPool, ugi);
}
public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) {
for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
LOG.info("cached writers {} ", entry.getValue());
}
}
/**
* Validates that one or more files exist, as specified in a single property.
*/
public static Validator createMultipleFilesExistValidator() {
return (subject, input, context) -> {
final String[] files = input.split("\\s*,\\s*");
for (String filename : files) {
try {
final File file = new File(filename.trim());
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not exist or is not a file";
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename + " due to " + e.getMessage();
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
};
}
}

View File

@ -0,0 +1,440 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveWriter {
private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class);
private final HiveEndPoint endPoint;
private final StreamingConnection connection;
private final int txnsPerBatch;
private final RecordWriter recordWriter;
private final ExecutorService callTimeoutPool;
private final long callTimeout;
private final Object txnBatchLock = new Object();
private TransactionBatch txnBatch;
private long lastUsed; // time of last flush on this writer
protected boolean closed; // flag indicating HiveWriter was closed
private boolean autoCreatePartitions;
private UserGroupInformation ugi;
private int totalRecords = 0;
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
boolean autoCreatePartitions, long callTimeout,
ExecutorService callTimeoutPool, UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
try {
this.autoCreatePartitions = autoCreatePartitions;
this.callTimeout = callTimeout;
this.callTimeoutPool = callTimeoutPool;
this.endPoint = endPoint;
this.ugi = ugi;
this.connection = newConnection(ugi);
this.txnsPerBatch = txnsPerBatch;
this.recordWriter = getRecordWriter(endPoint);
this.txnBatch = nextTxnBatch(recordWriter);
this.closed = false;
this.lastUsed = System.currentTimeMillis();
} catch (InterruptedException | RuntimeException e) {
throw e;
} catch (Exception e) {
throw new ConnectFailure(endPoint, e);
}
}
protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException {
return new StrictJsonWriter(endPoint);
}
@Override
public String toString() {
return "{ "
+ "endPoint = " + endPoint.toString()
+ ", TransactionBatch = " + txnBatch.toString() + " }";
}
/**
* Write the record data to Hive
*
* @throws IOException if an error occurs during the write
* @throws InterruptedException if the write operation is interrupted
*/
public synchronized void write(final byte[] record)
throws WriteFailure, SerializationError, InterruptedException {
if (closed) {
throw new IllegalStateException("This hive streaming writer was closed " +
"and thus no longer able to write : " + endPoint);
}
// write the tuple
try {
LOG.debug("Writing event to {}", endPoint);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws StreamingException, InterruptedException {
txnBatch.write(record);
totalRecords++;
return null;
}
});
} catch (SerializationError se) {
throw new SerializationError(endPoint.toString() + " SerializationError", se);
} catch (StreamingException | TimeoutException e) {
throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
}
}
/**
* Commits the current Txn if totalRecordsPerTransaction > 0 .
* If 'rollToNext' is true, will switch to next Txn in batch or to a
* new TxnBatch if current Txn batch is exhausted
*/
public void flush(boolean rollToNext)
throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
// if there are no records do not call flush
if (totalRecords <= 0) return;
try {
synchronized (txnBatchLock) {
commitTxn();
nextTxn(rollToNext);
totalRecords = 0;
lastUsed = System.currentTimeMillis();
}
} catch (StreamingException e) {
throw new TxnFailure(txnBatch, e);
}
}
/** Queues up a heartbeat request on the current and remaining txns using the
* heartbeatThdPool and returns immediately
*/
public void heartBeat() throws InterruptedException {
// 1) schedule the heartbeat on one thread in pool
synchronized (txnBatchLock) {
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
try {
LOG.info("Sending heartbeat on batch " + txnBatch);
txnBatch.heartbeat();
} catch (StreamingException e) {
LOG.warn("Heartbeat error on batch " + txnBatch, e);
}
return null;
}
});
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
// Suppressing exceptions as we don't care for errors on heartbeats
}
}
}
/**
* Returns totalRecords written so far in a transaction
* @returns totalRecords
*/
public int getTotalRecords() {
return totalRecords;
}
/**
* Flush and Close current transactionBatch.
*/
public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure,
IOException, InterruptedException {
flush(false);
close();
}
/**
* Close the Transaction Batch and connection
* @throws IOException if an error occurs during close
* @throws InterruptedException if the close operation is interrupted
*/
public void close() throws IOException, InterruptedException {
closeTxnBatch();
closeConnection();
closed = true;
}
protected void closeConnection() throws InterruptedException {
LOG.info("Closing connection to end point : {}", endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
connection.close(); // could block
return null;
}
});
} catch (Exception e) {
LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
// Suppressing exceptions as we don't care for errors on connection close
}
}
protected void commitTxn() throws CommitFailure, InterruptedException {
LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId(), endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
txnBatch.commit(); // could block
return null;
}
});
} catch (StreamingException | TimeoutException e) {
throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
}
}
protected StreamingConnection newConnection(final UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
try {
return callWithTimeout(() -> {
return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
});
} catch (StreamingException | TimeoutException e) {
throw new ConnectFailure(endPoint, e);
}
}
protected TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
throws InterruptedException, TxnBatchFailure {
LOG.debug("Fetching new Txn Batch for {}", endPoint);
TransactionBatch batch = null;
try {
batch = callWithTimeout(() -> {
return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
});
batch.beginNextTransaction();
LOG.debug("Acquired {}. Switching to first txn", batch);
} catch (TimeoutException | StreamingException e) {
throw new TxnBatchFailure(endPoint, e);
}
return batch;
}
protected void closeTxnBatch() throws InterruptedException {
try {
LOG.debug("Closing Txn Batch {}", txnBatch);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
if (txnBatch != null) {
txnBatch.close(); // could block
}
return null;
}
});
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
LOG.warn("Error closing txn batch " + txnBatch, e);
}
}
/**
* Aborts the current Txn and switches to next Txn.
* @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
*/
public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
synchronized (txnBatchLock) {
abortTxn();
nextTxn(true); // roll to next
}
}
/**
* Aborts current Txn in the txnBatch.
*/
protected void abortTxn() throws InterruptedException {
LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws StreamingException, InterruptedException {
txnBatch.abort(); // could block
return null;
}
});
} catch (InterruptedException e) {
throw e;
} catch (TimeoutException e) {
LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
} catch (Exception e) {
LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
// Suppressing exceptions as we don't care for errors on abort
}
}
/**
* if there are remainingTransactions in current txnBatch, begins nextTransactions
* otherwise creates new txnBatch.
* @param rollToNext Whether to roll to the next transaction batch
*/
protected void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
if (txnBatch.remainingTransactions() == 0) {
closeTxnBatch();
txnBatch = null;
if (rollToNext) {
txnBatch = nextTxnBatch(recordWriter);
}
} else if (rollToNext) {
LOG.debug("Switching to next Txn for {}", endPoint);
txnBatch.beginNextTransaction(); // does not block
}
}
/**
* If the current thread has been interrupted, then throws an
* exception.
* @throws InterruptedException uf the current thread has been interrupted
*/
protected static void checkAndThrowInterruptedException()
throws InterruptedException {
if (Thread.currentThread().interrupted()) {
throw new InterruptedException("Timed out before Hive call was made. "
+ "Your callTimeout might be set too low or Hive calls are "
+ "taking too long.");
}
}
/**
* Execute the callable on a separate thread and wait for the completion
* for the specified amount of time in milliseconds. In case of timeout
* cancel the callable and throw an IOException
*/
private <T> T callWithTimeout(final CallRunner<T> callRunner)
throws TimeoutException, StreamingException, InterruptedException {
Future<T> future = callTimeoutPool.submit(callRunner::call);
try {
if (callTimeout > 0) {
return future.get(callTimeout, TimeUnit.MILLISECONDS);
} else {
return future.get();
}
} catch (TimeoutException eT) {
future.cancel(true);
throw eT;
} catch (ExecutionException e1) {
Throwable cause = e1.getCause();
if (cause instanceof IOException) {
throw new StreamingIOFailure("I/O Failure", (IOException) cause);
} else if (cause instanceof StreamingException) {
throw (StreamingException) cause;
} else if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof TimeoutException) {
throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
} else {
throw new RuntimeException(e1);
}
}
}
public long getLastUsed() {
return lastUsed;
}
private byte[] generateRecord(List<String> tuple) {
StringBuilder buf = new StringBuilder();
for (String o : tuple) {
buf.append(o);
buf.append(",");
}
return buf.toString().getBytes();
}
/**
* Simple interface whose <tt>call</tt> method is called by
* {#callWithTimeout} in a new thread inside a
* {@linkplain java.security.PrivilegedExceptionAction#run()} call.
* @param <T> the type of object returned from the call
*/
private interface CallRunner<T> {
T call() throws Exception;
}
public static class Failure extends Exception {
public Failure(String message, Throwable cause) {
super(message, cause);
}
}
public static class WriteFailure extends Failure {
public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
}
}
public static class CommitFailure extends Failure {
public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
}
}
public static class ConnectFailure extends Failure {
public ConnectFailure(HiveEndPoint ep, Throwable cause) {
super("Failed connecting to EndPoint " + ep, cause);
}
}
public static class TxnBatchFailure extends Failure {
public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
}
}
public static class TxnFailure extends Failure {
public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.hive;
import org.apache.hadoop.conf.Configuration;
/**
* A helper class for maintaining loaded configurations (to avoid reloading on use unless necessary)
*/
public class ValidationResources {
private final String configResources;
private final Configuration configuration;
public ValidationResources(String configResources, Configuration configuration) {
this.configResources = configResources;
this.configuration = configuration;
}
public String getConfigResources() {
return configResources;
}
public Configuration getConfiguration() {
return configuration;
}
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.processors.hive.SelectHiveQL
org.apache.nifi.processors.hive.PutHiveQL
org.apache.nifi.processors.hive.ConvertAvroToORC
org.apache.nifi.processors.hive.PutHiveStreaming

View File

@ -0,0 +1,588 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit tests for PutHiveStreaming processor.
*/
public class TestPutHiveStreaming {
TestRunner runner;
MockPutHiveStreaming processor;
private KerberosProperties kerberosPropsWithFile;
private KerberosProperties kerberosPropsWithoutFile;
@Before
public void setUp() throws Exception {
// needed for calls to UserGroupInformation.setConfiguration() to work when passing in
// config with Kerberos authentication enabled
System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc");
NiFiProperties niFiPropertiesWithKerberos = mock(NiFiProperties.class);
when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(new File("src/test/resources/krb5.conf"));
kerberosPropsWithFile = KerberosProperties.create(niFiPropertiesWithKerberos);
NiFiProperties niFiPropertiesWithoutKerberos = mock(NiFiProperties.class);
when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(null);
kerberosPropsWithoutFile = KerberosProperties.create(niFiPropertiesWithoutKerberos);
processor = new MockPutHiveStreaming();
processor.setKerberosProperties(kerberosPropsWithFile);
runner = TestRunners.newTestRunner(processor);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testSetup() throws Exception {
runner.setValidateExpressionUsage(false);
runner.assertNotValid();
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.assertNotValid();
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.assertValid();
runner.run();
}
@Test
public void testSetupBadPartitionColumns() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.assertValid();
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number,,");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
runner.assertNotValid();
}
@Test(expected = AssertionError.class)
public void testSetupWithKerberosAuthFailed() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.HIVE_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml, src/test/resources/hive-site-security.xml");
runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.run();
}
@Test
public void onTrigger() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS);
}
@Test
public void onTriggerWithPartitionColumns() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
put("favorite_color", "blue");
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS);
}
@Test
public void onTriggerWithRetireWriters() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
for (int i = 0; i < 10; i++) {
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
}
runner.run(10);
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS);
}
@Test
public void onTriggerWithHeartbeat() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.HEARTBEAT_INTERVAL, "1");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(1, false);
// Wait for a heartbeat
Thread.sleep(1000);
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run(1, true);
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_SUCCESS);
}
@Test
public void onTriggerWithConnectFailure() throws Exception {
processor.setGenerateConnectFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY);
}
@Test
public void onTriggerWithInterruptedException() throws Exception {
processor.setGenerateInterruptedExceptionOnCreateWriter(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_RETRY);
}
@Test
public void onTriggerWithWriteFailure() throws Exception {
processor.setGenerateWriteFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE);
}
@Test
public void onTriggerWithSerializationError() throws Exception {
processor.setGenerateSerializationError(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE);
}
@Test
public void onTriggerWithCommitFailure() throws Exception {
processor.setGenerateCommitFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE);
}
@Test
public void onTriggerWithTransactionFailure() throws Exception {
processor.setGenerateTransactionFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
runner.assertAllFlowFilesTransferred(PutHiveStreaming.REL_FAILURE);
}
@Test
public void onTriggerWithExceptionOnFlushAndClose() throws Exception {
processor.setGenerateExceptionOnFlushAndClose(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
}
@Test
public void cleanup() throws Exception {
}
@Test
public void flushAllWriters() throws Exception {
}
@Test
public void abortAndCloseWriters() throws Exception {
}
private byte[] createAvroRecord(List<Map<String, Object>> records) throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
List<GenericRecord> users = new LinkedList<>();
for (Map<String, Object> record : records) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", record.get("name"));
user.put("favorite_number", record.get("favorite_number"));
user.put("favorite_color", record.get("favorite_color"));
users.add(user);
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, out);
for (final GenericRecord user : users) {
dataFileWriter.append(user);
}
}
return out.toByteArray();
}
private class MockPutHiveStreaming extends PutHiveStreaming {
private KerberosProperties kerberosProperties;
private boolean generateConnectFailure = false;
private boolean generateInterruptedExceptionOnCreateWriter = false;
private boolean generateWriteFailure = false;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
@Override
public KerberosProperties getKerberosProperties() {
return this.kerberosProperties;
}
public void setKerberosProperties(KerberosProperties kerberosProperties) {
this.kerberosProperties = kerberosProperties;
}
@Override
public HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions hiveOptions) {
HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
return hiveEndPoint;
}
@Override
protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
throws HiveWriter.ConnectFailure, InterruptedException {
if (generateConnectFailure) {
throw new HiveWriter.ConnectFailure(endPoint, new Exception());
}
if (generateInterruptedExceptionOnCreateWriter) {
throw new InterruptedException();
}
MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi);
hiveWriter.setGenerateWriteFailure(generateWriteFailure);
hiveWriter.setGenerateSerializationError(generateSerializationError);
hiveWriter.setGenerateCommitFailure(generateCommitFailure);
hiveWriter.setGenerateTransactionFailure(generateTransactionFailure);
hiveWriter.setGenerateExceptionOnFlushAndClose(generateExceptionOnFlushAndClose);
return hiveWriter;
}
public void setGenerateConnectFailure(boolean generateConnectFailure) {
this.generateConnectFailure = generateConnectFailure;
}
public void setGenerateInterruptedExceptionOnCreateWriter(boolean generateInterruptedExceptionOnCreateWriter) {
this.generateInterruptedExceptionOnCreateWriter = generateInterruptedExceptionOnCreateWriter;
}
public void setGenerateWriteFailure(boolean generateWriteFailure) {
this.generateWriteFailure = generateWriteFailure;
}
public void setGenerateSerializationError(boolean generateSerializationError) {
this.generateSerializationError = generateSerializationError;
}
public void setGenerateCommitFailure(boolean generateCommitFailure) {
this.generateCommitFailure = generateCommitFailure;
}
public void setGenerateTransactionFailure(boolean generateTransactionFailure) {
this.generateTransactionFailure = generateTransactionFailure;
}
public void setGenerateExceptionOnFlushAndClose(boolean generateExceptionOnFlushAndClose) {
this.generateExceptionOnFlushAndClose = generateExceptionOnFlushAndClose;
}
}
private class MockHiveWriter extends HiveWriter {
private boolean generateWriteFailure = false;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
private HiveEndPoint endPoint;
public MockHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions,
long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi);
this.endPoint = endPoint;
}
@Override
public synchronized void write(byte[] record) throws WriteFailure, SerializationError, InterruptedException {
if (generateWriteFailure) {
throw new HiveWriter.WriteFailure(endPoint, 1L, new Exception());
}
if (generateSerializationError) {
throw new SerializationError("Test Serialization Error", new Exception());
}
}
public void setGenerateWriteFailure(boolean generateWriteFailure) {
this.generateWriteFailure = generateWriteFailure;
}
public void setGenerateSerializationError(boolean generateSerializationError) {
this.generateSerializationError = generateSerializationError;
}
public void setGenerateCommitFailure(boolean generateCommitFailure) {
this.generateCommitFailure = generateCommitFailure;
}
public void setGenerateTransactionFailure(boolean generateTransactionFailure) {
this.generateTransactionFailure = generateTransactionFailure;
}
public void setGenerateExceptionOnFlushAndClose(boolean generateExceptionOnFlushAndClose) {
this.generateExceptionOnFlushAndClose = generateExceptionOnFlushAndClose;
}
@Override
protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException {
return mock(RecordWriter.class);
}
@Override
protected StreamingConnection newConnection(UserGroupInformation ugi) throws InterruptedException, ConnectFailure {
StreamingConnection connection = mock(StreamingConnection.class);
return connection;
}
@Override
public void flush(boolean rollToNext) throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
if (generateCommitFailure) {
throw new HiveWriter.CommitFailure(endPoint, 1L, new Exception());
}
if (generateTransactionFailure) {
throw new HiveWriter.TxnFailure(mock(TransactionBatch.class), new Exception());
}
}
@Override
public void heartBeat() throws InterruptedException {
}
@Override
public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure, IOException, InterruptedException {
if (generateExceptionOnFlushAndClose) {
throw new IOException();
}
}
@Override
public void close() throws IOException, InterruptedException {
}
@Override
public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
}
@Override
protected void closeConnection() throws InterruptedException {
// TODO
}
@Override
protected void commitTxn() throws CommitFailure, InterruptedException {
// TODO
}
@Override
protected TransactionBatch nextTxnBatch(RecordWriter recordWriter) throws InterruptedException, TxnBatchFailure {
TransactionBatch txnBatch = mock(TransactionBatch.class);
return txnBatch;
}
@Override
protected void closeTxnBatch() throws InterruptedException {
// TODO
}
@Override
protected void abortTxn() throws InterruptedException {
// TODO
}
@Override
protected void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
// TODO
}
}
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hive</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hive</value>
</property>
</configuration>

View File

@ -0,0 +1,26 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hive</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>KERBEROS</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hive</value>
</property>
</configuration>

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}

View File

@ -32,4 +32,19 @@
<module>nifi-hive-nar</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/fake.keytab</exclude>
<exclude>src/test/resources/krb5.conf</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>