From febb119faca57e8c9deaa9530434e93e2286d7de Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 14 Dec 2017 17:19:25 -0500 Subject: [PATCH] NIFI-4696 - Add Flowfile attribute EL support and per-table concurrency to PutHiveStreaming Signed-off-by: Pierre Villard This closes #2342. --- .../processors/hive/PutHiveStreaming.java | 224 +++++++++++------- 1 file changed, 144 insertions(+), 80 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index bc11bf5000..cc241e0072 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -32,7 +32,6 @@ import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; -import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -79,8 +78,10 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -91,11 +92,12 @@ import java.util.regex.Pattern; /** * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. */ -@TriggerSerially @Tags({"hive", "streaming", "put", "database", "store"}) @CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " - + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") + + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. NOTE: If " + + "multiple concurrent tasks are configured for this processor, only one table can be written to at any time by a single thread. Additional tasks " + + "intending to write to the same table will wait for the current task to finish writing to the table.") @WritesAttributes({ @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."), @@ -108,6 +110,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { // Attributes public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count"; + private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled"; + // Validators private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { @@ -156,7 +160,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .displayName("Hive Configuration Resources") .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " - + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set " + + "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. " + + "Please see the Hive documentation for more details.") .required(false) .addValidator(HiveUtils.createMultipleFilesExistValidator()) .build(); @@ -214,11 +220,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .name("hive-stream-heartbeat-interval") .displayName("Heartbeat Interval") .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. " - + "A value of 0 indicates that no heartbeat should be sent.") + + "A value of 0 indicates that no heartbeat should be sent. " + + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.") .defaultValue("60") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .sensitive(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder() @@ -241,6 +248,17 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .defaultValue("10000") .build(); + public static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder() + .name("hive-stream-call-timeout") + .displayName("Call Timeout") + .description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. " + + "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty( "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," + " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" + @@ -280,11 +298,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false); - protected HiveOptions options; + protected volatile int callTimeout; protected ExecutorService callTimeoutPool; protected transient Timer heartBeatTimer; - protected Map allWriters = Collections.emptyMap(); + protected volatile ConcurrentLinkedQueue> threadWriterList = new ConcurrentLinkedQueue<>(); + protected volatile ConcurrentHashMap tableSemaphoreMap = new ConcurrentHashMap<>(); @Override protected void init(ProcessorInitializationContext context) { @@ -299,6 +318,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { props.add(HEARTBEAT_INTERVAL); props.add(TXNS_PER_BATCH); props.add(RECORDS_PER_TXN); + props.add(CALL_TIMEOUT); props.add(ROLLBACK_ON_FAILURE); kerberosConfigFile = context.getKerberosConfigurationFile(); @@ -329,16 +349,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { public void setup(final ProcessContext context) { ComponentLog log = getLogger(); - final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); - final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue(); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); - final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); - final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); - final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); - final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions().asInteger(); + final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger(); final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); + // If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true + if(context.getMaxConcurrentTasks() > 1) { + hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true); + } + // add any dynamic properties to the Hive configuration for (final Map.Entry entry : context.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); @@ -347,12 +366,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } } - options = new HiveOptions(metastoreUri, dbName, tableName) - .withTxnsPerBatch(txnsPerBatch) - .withAutoCreatePartitions(autoCreatePartitions) - .withMaxOpenConnections(maxConnections) - .withHeartBeatInterval(heartbeatInterval); - hiveConfigurator.preload(hiveConfig); if (SecurityUtil.isSecurityEnabled(hiveConfig)) { @@ -366,19 +379,18 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); } log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); - options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab); } else { ugi = null; } - allWriters = new ConcurrentHashMap<>(); + callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds String timeoutName = "put-hive-streaming-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); sendHeartBeat.set(true); heartBeatTimer = new Timer(); - setupHeartBeatTimer(); + setupHeartBeatTimer(heartbeatInterval); } private static class FunctionContext extends RollbackOnFailure { @@ -514,7 +526,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } } - private ExceptionHandler.OnError> onHiveRecordsError(ProcessContext context, ProcessSession session) { + private ExceptionHandler.OnError> onHiveRecordsError(ProcessContext context, ProcessSession session, Map writers) { return RollbackOnFailure.createOnError((fc, input, res, e) -> { if (res.penalty() == ErrorTypes.Penalty.Yield) { @@ -530,15 +542,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { case Retry: // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry - abortAndCloseWriters(); + abortAndCloseWriters(writers); throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e); case Self: - abortAndCloseWriters(); + abortAndCloseWriters(writers); break; default: - abortAndCloseWriters(); + abortAndCloseWriters(writers); if (e instanceof ProcessException) { throw (ProcessException) e; } else { @@ -548,12 +560,12 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { }); } - private ExceptionHandler.OnError onHiveRecordError(ProcessContext context, ProcessSession session) { - return (fc, input, res, e) -> onHiveRecordsError(context, session).apply(fc, Collections.singletonList(input), res, e); + private ExceptionHandler.OnError onHiveRecordError(ProcessContext context, ProcessSession session, Map writers) { + return (fc, input, res, e) -> onHiveRecordsError(context, session, writers).apply(fc, Collections.singletonList(input), res, e); } - private ExceptionHandler.OnError onRecordError(ProcessContext context, ProcessSession session) { - return (fc, input, res, e) -> onHiveRecordError(context, session).apply(fc, new HiveStreamingRecord(null, input), res, e); + private ExceptionHandler.OnError onRecordError(ProcessContext context, ProcessSession session, Map writers) { + return (fc, input, res, e) -> onHiveRecordError(context, session, writers).apply(fc, new HiveStreamingRecord(null, input), res, e); } @Override @@ -568,9 +580,55 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { return; } + final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + // Only allow one thread to work on a DB/table at a time + final Semaphore newSemaphore = new Semaphore(1); + Semaphore semaphore = tableSemaphoreMap.putIfAbsent(dbName + "." + tableName, newSemaphore); + if (semaphore == null) { + semaphore = newSemaphore; + } + + boolean gotSemaphore = false; + try { + gotSemaphore = semaphore.tryAcquire(0, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + // Nothing to do, gotSemaphore defaults to false + } + if (!gotSemaphore) { + // We didn't get a chance to acquire, so rollback the session and try again next time + session.rollback(); + return; + } + final ComponentLog log = getLogger(); + final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue(); + + final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); + final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); + final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger(); + final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger(); final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger(); + final Map myWriters = new ConcurrentHashMap<>(); + threadWriterList.add(myWriters); + + HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName) + .withTxnsPerBatch(txnsPerBatch) + .withAutoCreatePartitions(autoCreatePartitions) + .withMaxOpenConnections(maxConnections) + .withHeartBeatInterval(heartbeatInterval) + .withCallTimeout(callTimeout); + + if (SecurityUtil.isSecurityEnabled(hiveConfig)) { + final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + o = o.withKerberosPrincipal(principal).withKerberosKeytab(keyTab); + } + + final HiveOptions options = o; + // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); @@ -673,7 +731,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } partitionValues.add(partitionValue.toString()); } - }, onRecordError(context, session))) { + }, onRecordError(context, session, myWriters))) { continue; } @@ -684,13 +742,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { if (!exceptionHandler.execute(functionContext, record, input -> { final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); - final HiveWriter hiveWriter = getOrCreateWriter(endPoint); + final HiveWriter hiveWriter = getOrCreateWriter(myWriters, options, endPoint); hiveWriterRef.set(hiveWriter); hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); successfulRecords.get().add(record); - }, onHiveRecordError(context, session))) { + }, onHiveRecordError(context, session, myWriters))) { continue; } @@ -706,7 +764,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { // Now send the records to the success relationship and update the success count flushSuccessfulRecords.run(); - }, onHiveRecordsError(context, session).andThen((fc, input, res, commitException) -> { + }, onHiveRecordsError(context, session, myWriters).andThen((fc, input, res, commitException) -> { // Reset hiveWriter for succeeding records. switch (res.destination()) { case Retry: @@ -725,14 +783,14 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { // Finish any transactions - flushAllWriters(true); - closeAllWriters(); + flushAllWriters(myWriters, true); + closeAllWriters(myWriters); // Now send any remaining records to the success relationship and update the count flushSuccessfulRecords.run(); // Append successfulRecords on failure. - }, onHiveRecordsError(context, session)); + }, onHiveRecordsError(context, session, myWriters)); } catch (IOException ioe) { // The Avro file is invalid (or may not be an Avro file at all), send it to failure @@ -767,9 +825,11 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { result.routeTo(flowFile, REL_RETRY); } finally { + threadWriterList.remove(myWriters); functionContext.transferFlowFiles(session, result, options); // Restore original class loader, might not be necessary but is good practice since the processor task changed it Thread.currentThread().setContextClassLoader(originalClassloader); + semaphore.release(); } } @@ -777,24 +837,26 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { public void cleanup() { ComponentLog log = getLogger(); sendHeartBeat.set(false); - for (Map.Entry entry : allWriters.entrySet()) { - try { - HiveWriter w = entry.getValue(); - w.flushAndClose(); - } catch (Exception ex) { - log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex); - if (ex instanceof InterruptedException) { - Thread.currentThread().interrupt(); + for(Map allWriters : threadWriterList) { + for (Map.Entry entry : allWriters.entrySet()) { + try { + HiveWriter w = entry.getValue(); + w.flushAndClose(); + } catch (Exception ex) { + log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex); + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } + allWriters.clear(); } - allWriters = Collections.emptyMap(); if (callTimeoutPool != null) { callTimeoutPool.shutdown(); try { while (!callTimeoutPool.isTerminated()) { - callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); + callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS); } } catch (Throwable t) { log.warn("shutdown interrupted on " + callTimeoutPool, t); @@ -806,8 +868,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { hiveConfigurator.stopRenewer(); } - private void setupHeartBeatTimer() { - if (options.getHeartBeatInterval() > 0) { + private void setupHeartBeatTimer(int heartbeatInterval) { + if (heartbeatInterval > 0) { final ComponentLog log = getLogger(); heartBeatTimer.schedule(new TimerTask() { @Override @@ -816,33 +878,35 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { if (sendHeartBeat.get()) { log.debug("Start sending heartbeat on all writers"); sendHeartBeatOnAllWriters(); - setupHeartBeatTimer(); + setupHeartBeatTimer(heartbeatInterval); } } catch (Exception e) { log.warn("Failed to heartbeat on HiveWriter ", e); } } - }, options.getHeartBeatInterval() * 1000); + }, heartbeatInterval * 1000); } } private void sendHeartBeatOnAllWriters() throws InterruptedException { - for (HiveWriter writer : allWriters.values()) { - writer.heartBeat(); + for(Map allWriters : threadWriterList) { + for (HiveWriter writer : allWriters.values()) { + writer.heartBeat(); + } } } - private void flushAllWriters(boolean rollToNext) + private void flushAllWriters(Map writers, boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { - for (HiveWriter writer : allWriters.values()) { + for (HiveWriter writer : writers.values()) { writer.flush(rollToNext); } } - private void abortAndCloseWriters() { + private void abortAndCloseWriters(Map writers) { try { - abortAllWriters(); - closeAllWriters(); + abortAllWriters(writers); + closeAllWriters(writers); } catch (Exception ie) { getLogger().warn("unable to close hive connections. ", ie); } @@ -851,8 +915,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { /** * Abort current Txn on all writers */ - private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { - for (Map.Entry entry : allWriters.entrySet()) { + private void abortAllWriters(Map writers) throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { + for (Map.Entry entry : writers.entrySet()) { try { entry.getValue().abort(); } catch (Exception e) { @@ -864,9 +928,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { /** * Closes all writers and remove them from cache */ - private void closeAllWriters() { + private void closeAllWriters(Map writers) { //1) Retire writers - for (Map.Entry entry : allWriters.entrySet()) { + for (Map.Entry entry : writers.entrySet()) { try { entry.getValue().close(); } catch (Exception e) { @@ -874,25 +938,25 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } } //2) Clear cache - allWriters.clear(); + writers.clear(); } - private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { + private HiveWriter getOrCreateWriter(Map writers, HiveOptions options, HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { ComponentLog log = getLogger(); try { - HiveWriter writer = allWriters.get(endPoint); + HiveWriter writer = writers.get(endPoint); if (writer == null) { log.debug("Creating Writer to Hive end point : " + endPoint); writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options); - if (allWriters.size() > (options.getMaxOpenConnections() - 1)) { - log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{allWriters.size(), options.getMaxOpenConnections()}); - int retired = retireIdleWriters(); + if (writers.size() > (options.getMaxOpenConnections() - 1)) { + log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{writers.size(), options.getMaxOpenConnections()}); + int retired = retireIdleWriters(writers, options.getIdleTimeout()); if (retired == 0) { - retireEldestWriter(); + retireEldestWriter(writers); } } - allWriters.put(endPoint, writer); - HiveUtils.logAllHiveEndPoints(allWriters); + writers.put(endPoint, writer); + HiveUtils.logAllHiveEndPoints(writers); } return writer; } catch (HiveWriter.ConnectFailure e) { @@ -904,13 +968,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { /** * Locate writer that has not been used for longest time and retire it */ - private void retireEldestWriter() { + private void retireEldestWriter(Map writers) { ComponentLog log = getLogger(); log.info("Attempting close eldest writers"); long oldestTimeStamp = System.currentTimeMillis(); HiveEndPoint eldest = null; - for (Map.Entry entry : allWriters.entrySet()) { + for (Map.Entry entry : writers.entrySet()) { if (entry.getValue().getLastUsed() < oldestTimeStamp) { eldest = entry.getKey(); oldestTimeStamp = entry.getValue().getLastUsed(); @@ -918,7 +982,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } try { log.info("Closing least used Writer to Hive end point : " + eldest); - allWriters.remove(eldest).flushAndClose(); + writers.remove(eldest).flushAndClose(); } catch (IOException e) { log.warn("Failed to close writer for end point: " + eldest, e); } catch (InterruptedException e) { @@ -934,7 +998,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { * * @return number of writers retired */ - private int retireIdleWriters() { + private int retireIdleWriters(Map writers, int idleTimeout) { ComponentLog log = getLogger(); log.info("Attempting to close idle HiveWriters"); @@ -943,8 +1007,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { ArrayList retirees = new ArrayList<>(); //1) Find retirement candidates - for (Map.Entry entry : allWriters.entrySet()) { - if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { + for (Map.Entry entry : writers.entrySet()) { + if (now - entry.getValue().getLastUsed() > idleTimeout) { ++count; retirees.add(entry.getKey()); } @@ -953,7 +1017,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { for (HiveEndPoint ep : retirees) { try { log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep}); - allWriters.remove(ep).flushAndClose(); + writers.remove(ep).flushAndClose(); } catch (IOException e) { log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, e); } catch (InterruptedException e) {