NIFI-4696 - Add Flowfile attribute EL support and per-table concurrency to PutHiveStreaming

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2342.
This commit is contained in:
Matthew Burgess 2017-12-14 17:19:25 -05:00 committed by Pierre Villard
parent 1ee8d16a21
commit febb119fac
1 changed files with 144 additions and 80 deletions

View File

@ -32,7 +32,6 @@ import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -79,8 +78,10 @@ import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -91,11 +92,12 @@ import java.util.regex.Pattern;
/** /**
* This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
*/ */
@TriggerSerially
@Tags({"hive", "streaming", "put", "database", "store"}) @Tags({"hive", "streaming", "put", "database", "store"})
@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " @CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
+ "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
+ "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. NOTE: If "
+ "multiple concurrent tasks are configured for this processor, only one table can be written to at any time by a single thread. Additional tasks "
+ "intending to write to the same table will wait for the current task to finish writing to the table.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."), + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."),
@ -108,6 +110,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
// Attributes // Attributes
public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count"; public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
// Validators // Validators
private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> { private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
@ -156,7 +160,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.displayName("Hive Configuration Resources") .displayName("Hive Configuration Resources")
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
+ "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
+ "Please see the Hive documentation for more details.")
.required(false) .required(false)
.addValidator(HiveUtils.createMultipleFilesExistValidator()) .addValidator(HiveUtils.createMultipleFilesExistValidator())
.build(); .build();
@ -214,11 +220,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.name("hive-stream-heartbeat-interval") .name("hive-stream-heartbeat-interval")
.displayName("Heartbeat Interval") .displayName("Heartbeat Interval")
.description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. " .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
+ "A value of 0 indicates that no heartbeat should be sent.") + "A value of 0 indicates that no heartbeat should be sent. "
+ "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
.defaultValue("60") .defaultValue("60")
.required(true) .required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.sensitive(false) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder() public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
@ -241,6 +248,17 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.defaultValue("10000") .defaultValue("10000")
.build(); .build();
public static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive-stream-call-timeout")
.displayName("Call Timeout")
.description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
+ "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty( public static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
"NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," + "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
" (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" + " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
@ -280,11 +298,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false); protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
protected HiveOptions options; protected volatile int callTimeout;
protected ExecutorService callTimeoutPool; protected ExecutorService callTimeoutPool;
protected transient Timer heartBeatTimer; protected transient Timer heartBeatTimer;
protected Map<HiveEndPoint, HiveWriter> allWriters = Collections.emptyMap();
protected volatile ConcurrentLinkedQueue<Map<HiveEndPoint, HiveWriter>> threadWriterList = new ConcurrentLinkedQueue<>();
protected volatile ConcurrentHashMap<String, Semaphore> tableSemaphoreMap = new ConcurrentHashMap<>();
@Override @Override
protected void init(ProcessorInitializationContext context) { protected void init(ProcessorInitializationContext context) {
@ -299,6 +318,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
props.add(HEARTBEAT_INTERVAL); props.add(HEARTBEAT_INTERVAL);
props.add(TXNS_PER_BATCH); props.add(TXNS_PER_BATCH);
props.add(RECORDS_PER_TXN); props.add(RECORDS_PER_TXN);
props.add(CALL_TIMEOUT);
props.add(ROLLBACK_ON_FAILURE); props.add(ROLLBACK_ON_FAILURE);
kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosConfigFile = context.getKerberosConfigurationFile();
@ -329,16 +349,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions().asInteger();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
// If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
if(context.getMaxConcurrentTasks() > 1) {
hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
}
// add any dynamic properties to the Hive configuration // add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey(); final PropertyDescriptor descriptor = entry.getKey();
@ -347,12 +366,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
} }
} }
options = new HiveOptions(metastoreUri, dbName, tableName)
.withTxnsPerBatch(txnsPerBatch)
.withAutoCreatePartitions(autoCreatePartitions)
.withMaxOpenConnections(maxConnections)
.withHeartBeatInterval(heartbeatInterval);
hiveConfigurator.preload(hiveConfig); hiveConfigurator.preload(hiveConfig);
if (SecurityUtil.isSecurityEnabled(hiveConfig)) { if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
@ -366,19 +379,18 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
} }
log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
} else { } else {
ugi = null; ugi = null;
} }
allWriters = new ConcurrentHashMap<>(); callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
String timeoutName = "put-hive-streaming-%d"; String timeoutName = "put-hive-streaming-%d";
this.callTimeoutPool = Executors.newFixedThreadPool(1, this.callTimeoutPool = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
sendHeartBeat.set(true); sendHeartBeat.set(true);
heartBeatTimer = new Timer(); heartBeatTimer = new Timer();
setupHeartBeatTimer(); setupHeartBeatTimer(heartbeatInterval);
} }
private static class FunctionContext extends RollbackOnFailure { private static class FunctionContext extends RollbackOnFailure {
@ -514,7 +526,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
} }
} }
private ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, ProcessSession session) { private ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
return RollbackOnFailure.createOnError((fc, input, res, e) -> { return RollbackOnFailure.createOnError((fc, input, res, e) -> {
if (res.penalty() == ErrorTypes.Penalty.Yield) { if (res.penalty() == ErrorTypes.Penalty.Yield) {
@ -530,15 +542,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
case Retry: case Retry:
// If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry
abortAndCloseWriters(); abortAndCloseWriters(writers);
throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e); throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e);
case Self: case Self:
abortAndCloseWriters(); abortAndCloseWriters(writers);
break; break;
default: default:
abortAndCloseWriters(); abortAndCloseWriters(writers);
if (e instanceof ProcessException) { if (e instanceof ProcessException) {
throw (ProcessException) e; throw (ProcessException) e;
} else { } else {
@ -548,12 +560,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
}); });
} }
private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext context, ProcessSession session) { private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
return (fc, input, res, e) -> onHiveRecordsError(context, session).apply(fc, Collections.singletonList(input), res, e); return (fc, input, res, e) -> onHiveRecordsError(context, session, writers).apply(fc, Collections.singletonList(input), res, e);
} }
private ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext context, ProcessSession session) { private ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
return (fc, input, res, e) -> onHiveRecordError(context, session).apply(fc, new HiveStreamingRecord(null, input), res, e); return (fc, input, res, e) -> onHiveRecordError(context, session, writers).apply(fc, new HiveStreamingRecord(null, input), res, e);
} }
@Override @Override
@ -568,9 +580,55 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
return; return;
} }
final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
// Only allow one thread to work on a DB/table at a time
final Semaphore newSemaphore = new Semaphore(1);
Semaphore semaphore = tableSemaphoreMap.putIfAbsent(dbName + "." + tableName, newSemaphore);
if (semaphore == null) {
semaphore = newSemaphore;
}
boolean gotSemaphore = false;
try {
gotSemaphore = semaphore.tryAcquire(0, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
// Nothing to do, gotSemaphore defaults to false
}
if (!gotSemaphore) {
// We didn't get a chance to acquire, so rollback the session and try again next time
session.rollback();
return;
}
final ComponentLog log = getLogger(); final ComponentLog log = getLogger();
final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger(); final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
final Map<HiveEndPoint, HiveWriter> myWriters = new ConcurrentHashMap<>();
threadWriterList.add(myWriters);
HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
.withTxnsPerBatch(txnsPerBatch)
.withAutoCreatePartitions(autoCreatePartitions)
.withMaxOpenConnections(maxConnections)
.withHeartBeatInterval(heartbeatInterval)
.withCallTimeout(callTimeout);
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
o = o.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
}
final HiveOptions options = o;
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
@ -673,7 +731,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
} }
partitionValues.add(partitionValue.toString()); partitionValues.add(partitionValue.toString());
} }
}, onRecordError(context, session))) { }, onRecordError(context, session, myWriters))) {
continue; continue;
} }
@ -684,13 +742,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
if (!exceptionHandler.execute(functionContext, record, input -> { if (!exceptionHandler.execute(functionContext, record, input -> {
final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
final HiveWriter hiveWriter = getOrCreateWriter(endPoint); final HiveWriter hiveWriter = getOrCreateWriter(myWriters, options, endPoint);
hiveWriterRef.set(hiveWriter); hiveWriterRef.set(hiveWriter);
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
successfulRecords.get().add(record); successfulRecords.get().add(record);
}, onHiveRecordError(context, session))) { }, onHiveRecordError(context, session, myWriters))) {
continue; continue;
} }
@ -706,7 +764,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
// Now send the records to the success relationship and update the success count // Now send the records to the success relationship and update the success count
flushSuccessfulRecords.run(); flushSuccessfulRecords.run();
}, onHiveRecordsError(context, session).andThen((fc, input, res, commitException) -> { }, onHiveRecordsError(context, session, myWriters).andThen((fc, input, res, commitException) -> {
// Reset hiveWriter for succeeding records. // Reset hiveWriter for succeeding records.
switch (res.destination()) { switch (res.destination()) {
case Retry: case Retry:
@ -725,14 +783,14 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { exceptionHandler.execute(functionContext, successfulRecords.get(), input -> {
// Finish any transactions // Finish any transactions
flushAllWriters(true); flushAllWriters(myWriters, true);
closeAllWriters(); closeAllWriters(myWriters);
// Now send any remaining records to the success relationship and update the count // Now send any remaining records to the success relationship and update the count
flushSuccessfulRecords.run(); flushSuccessfulRecords.run();
// Append successfulRecords on failure. // Append successfulRecords on failure.
}, onHiveRecordsError(context, session)); }, onHiveRecordsError(context, session, myWriters));
} catch (IOException ioe) { } catch (IOException ioe) {
// The Avro file is invalid (or may not be an Avro file at all), send it to failure // The Avro file is invalid (or may not be an Avro file at all), send it to failure
@ -767,9 +825,11 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
result.routeTo(flowFile, REL_RETRY); result.routeTo(flowFile, REL_RETRY);
} finally { } finally {
threadWriterList.remove(myWriters);
functionContext.transferFlowFiles(session, result, options); functionContext.transferFlowFiles(session, result, options);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it // Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader); Thread.currentThread().setContextClassLoader(originalClassloader);
semaphore.release();
} }
} }
@ -777,24 +837,26 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
public void cleanup() { public void cleanup() {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
sendHeartBeat.set(false); sendHeartBeat.set(false);
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { for(Map<HiveEndPoint, HiveWriter> allWriters : threadWriterList) {
try { for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
HiveWriter w = entry.getValue(); try {
w.flushAndClose(); HiveWriter w = entry.getValue();
} catch (Exception ex) { w.flushAndClose();
log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex); } catch (Exception ex) {
if (ex instanceof InterruptedException) { log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex);
Thread.currentThread().interrupt(); if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} }
} }
allWriters.clear();
} }
allWriters = Collections.emptyMap();
if (callTimeoutPool != null) { if (callTimeoutPool != null) {
callTimeoutPool.shutdown(); callTimeoutPool.shutdown();
try { try {
while (!callTimeoutPool.isTerminated()) { while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
} }
} catch (Throwable t) { } catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t); log.warn("shutdown interrupted on " + callTimeoutPool, t);
@ -806,8 +868,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
hiveConfigurator.stopRenewer(); hiveConfigurator.stopRenewer();
} }
private void setupHeartBeatTimer() { private void setupHeartBeatTimer(int heartbeatInterval) {
if (options.getHeartBeatInterval() > 0) { if (heartbeatInterval > 0) {
final ComponentLog log = getLogger(); final ComponentLog log = getLogger();
heartBeatTimer.schedule(new TimerTask() { heartBeatTimer.schedule(new TimerTask() {
@Override @Override
@ -816,33 +878,35 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
if (sendHeartBeat.get()) { if (sendHeartBeat.get()) {
log.debug("Start sending heartbeat on all writers"); log.debug("Start sending heartbeat on all writers");
sendHeartBeatOnAllWriters(); sendHeartBeatOnAllWriters();
setupHeartBeatTimer(); setupHeartBeatTimer(heartbeatInterval);
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to heartbeat on HiveWriter ", e); log.warn("Failed to heartbeat on HiveWriter ", e);
} }
} }
}, options.getHeartBeatInterval() * 1000); }, heartbeatInterval * 1000);
} }
} }
private void sendHeartBeatOnAllWriters() throws InterruptedException { private void sendHeartBeatOnAllWriters() throws InterruptedException {
for (HiveWriter writer : allWriters.values()) { for(Map<HiveEndPoint, HiveWriter> allWriters : threadWriterList) {
writer.heartBeat(); for (HiveWriter writer : allWriters.values()) {
writer.heartBeat();
}
} }
} }
private void flushAllWriters(boolean rollToNext) private void flushAllWriters(Map<HiveEndPoint, HiveWriter> writers, boolean rollToNext)
throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
for (HiveWriter writer : allWriters.values()) { for (HiveWriter writer : writers.values()) {
writer.flush(rollToNext); writer.flush(rollToNext);
} }
} }
private void abortAndCloseWriters() { private void abortAndCloseWriters(Map<HiveEndPoint, HiveWriter> writers) {
try { try {
abortAllWriters(); abortAllWriters(writers);
closeAllWriters(); closeAllWriters(writers);
} catch (Exception ie) { } catch (Exception ie) {
getLogger().warn("unable to close hive connections. ", ie); getLogger().warn("unable to close hive connections. ", ie);
} }
@ -851,8 +915,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
/** /**
* Abort current Txn on all writers * Abort current Txn on all writers
*/ */
private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { private void abortAllWriters(Map<HiveEndPoint, HiveWriter> writers) throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
try { try {
entry.getValue().abort(); entry.getValue().abort();
} catch (Exception e) { } catch (Exception e) {
@ -864,9 +928,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
/** /**
* Closes all writers and remove them from cache * Closes all writers and remove them from cache
*/ */
private void closeAllWriters() { private void closeAllWriters(Map<HiveEndPoint, HiveWriter> writers) {
//1) Retire writers //1) Retire writers
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
try { try {
entry.getValue().close(); entry.getValue().close();
} catch (Exception e) { } catch (Exception e) {
@ -874,25 +938,25 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
} }
} }
//2) Clear cache //2) Clear cache
allWriters.clear(); writers.clear();
} }
private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> writers, HiveOptions options, HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
try { try {
HiveWriter writer = allWriters.get(endPoint); HiveWriter writer = writers.get(endPoint);
if (writer == null) { if (writer == null) {
log.debug("Creating Writer to Hive end point : " + endPoint); log.debug("Creating Writer to Hive end point : " + endPoint);
writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options); writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
if (allWriters.size() > (options.getMaxOpenConnections() - 1)) { if (writers.size() > (options.getMaxOpenConnections() - 1)) {
log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{allWriters.size(), options.getMaxOpenConnections()}); log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{writers.size(), options.getMaxOpenConnections()});
int retired = retireIdleWriters(); int retired = retireIdleWriters(writers, options.getIdleTimeout());
if (retired == 0) { if (retired == 0) {
retireEldestWriter(); retireEldestWriter(writers);
} }
} }
allWriters.put(endPoint, writer); writers.put(endPoint, writer);
HiveUtils.logAllHiveEndPoints(allWriters); HiveUtils.logAllHiveEndPoints(writers);
} }
return writer; return writer;
} catch (HiveWriter.ConnectFailure e) { } catch (HiveWriter.ConnectFailure e) {
@ -904,13 +968,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
/** /**
* Locate writer that has not been used for longest time and retire it * Locate writer that has not been used for longest time and retire it
*/ */
private void retireEldestWriter() { private void retireEldestWriter(Map<HiveEndPoint, HiveWriter> writers) {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
log.info("Attempting close eldest writers"); log.info("Attempting close eldest writers");
long oldestTimeStamp = System.currentTimeMillis(); long oldestTimeStamp = System.currentTimeMillis();
HiveEndPoint eldest = null; HiveEndPoint eldest = null;
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
if (entry.getValue().getLastUsed() < oldestTimeStamp) { if (entry.getValue().getLastUsed() < oldestTimeStamp) {
eldest = entry.getKey(); eldest = entry.getKey();
oldestTimeStamp = entry.getValue().getLastUsed(); oldestTimeStamp = entry.getValue().getLastUsed();
@ -918,7 +982,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
} }
try { try {
log.info("Closing least used Writer to Hive end point : " + eldest); log.info("Closing least used Writer to Hive end point : " + eldest);
allWriters.remove(eldest).flushAndClose(); writers.remove(eldest).flushAndClose();
} catch (IOException e) { } catch (IOException e) {
log.warn("Failed to close writer for end point: " + eldest, e); log.warn("Failed to close writer for end point: " + eldest, e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -934,7 +998,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
* *
* @return number of writers retired * @return number of writers retired
*/ */
private int retireIdleWriters() { private int retireIdleWriters(Map<HiveEndPoint, HiveWriter> writers, int idleTimeout) {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
log.info("Attempting to close idle HiveWriters"); log.info("Attempting to close idle HiveWriters");
@ -943,8 +1007,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
ArrayList<HiveEndPoint> retirees = new ArrayList<>(); ArrayList<HiveEndPoint> retirees = new ArrayList<>();
//1) Find retirement candidates //1) Find retirement candidates
for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { if (now - entry.getValue().getLastUsed() > idleTimeout) {
++count; ++count;
retirees.add(entry.getKey()); retirees.add(entry.getKey());
} }
@ -953,7 +1017,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
for (HiveEndPoint ep : retirees) { for (HiveEndPoint ep : retirees) {
try { try {
log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep}); log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep});
allWriters.remove(ep).flushAndClose(); writers.remove(ep).flushAndClose();
} catch (IOException e) { } catch (IOException e) {
log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, e); log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, e);
} catch (InterruptedException e) { } catch (InterruptedException e) {