From 1e56de9521e4bc0752b419ffc7d62e096db1c389 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 29 Jan 2016 11:12:30 -0500 Subject: [PATCH] NIFI-1417 Exposing several connection settings on the Solr processors to help deal with timeouts and adding an @OnStopped method to close the SolrClient which didn't appear to be happening before This closes #194 --- .../apache/nifi/processors/solr/GetSolr.java | 6 ++ .../processors/solr/PutSolrContentStream.java | 6 ++ .../nifi/processors/solr/SolrProcessor.java | 92 ++++++++++++++++++- .../solr/TestPutSolrContentStream.java | 11 ++- 4 files changed, 105 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index a85aa0fb5f..0de5b089a8 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -139,6 +139,12 @@ public class GetSolr extends SolrProcessor { descriptors.add(SORT_CLAUSE); descriptors.add(DATE_FIELD); descriptors.add(BATCH_SIZE); + descriptors.add(SOLR_SOCKET_TIMEOUT); + descriptors.add(SOLR_CONNECTION_TIMEOUT); + descriptors.add(SOLR_MAX_CONNECTIONS); + descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST); + descriptors.add(ZK_CLIENT_TIMEOUT); + descriptors.add(ZK_CONNECTION_TIMEOUT); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index df034c9763..ca1628647f 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -123,6 +123,12 @@ public class PutSolrContentStream extends SolrProcessor { descriptors.add(CONTENT_STREAM_PATH); descriptors.add(CONTENT_TYPE); descriptors.add(COMMIT_WITHIN); + descriptors.add(SOLR_SOCKET_TIMEOUT); + descriptors.add(SOLR_CONNECTION_TIMEOUT); + descriptors.add(SOLR_MAX_CONNECTIONS); + descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST); + descriptors.add(ZK_CLIENT_TIMEOUT); + descriptors.add(ZK_CONNECTION_TIMEOUT); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java index 27f208aa3d..2941382e87 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -18,7 +18,9 @@ */ package org.apache.nifi.processors.solr; +import org.apache.http.client.HttpClient; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -28,12 +30,15 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.common.params.ModifiableSolrParams; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; /** * A base class for processors that interact with Apache Solr. @@ -71,6 +76,54 @@ public abstract class SolrProcessor extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor + .Builder().name("Solr Socket Timeout") + .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor + .Builder().name("Solr Connection Timeout") + .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor + .Builder().name("Solr Maximum Connections") + .description("The maximum number of total connections allowed from the Solr client to Solr.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor + .Builder().name("Solr Maximum Connections Per Host") + .description("The maximum number of connections allowed from the Solr client to a single Solr host.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .build(); + + public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor + .Builder().name("ZooKeeper Client Timeout") + .description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor + .Builder().name("ZooKeeper Connection Timeout") + .description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("10 seconds") + .build(); + private volatile SolrClient solrClient; @OnScheduled @@ -78,6 +131,17 @@ public abstract class SolrProcessor extends AbstractProcessor { this.solrClient = createSolrClient(context); } + @OnStopped + public final void closeClient() { + if (solrClient != null) { + try { + solrClient.close(); + } catch (IOException e) { + getLogger().debug("Error closing SolrClient", e); + } + } + } + /** * Create a SolrClient based on the type of Solr specified. * @@ -86,13 +150,31 @@ public abstract class SolrProcessor extends AbstractProcessor { * @return an HttpSolrClient or CloudSolrClient */ protected SolrClient createSolrClient(final ProcessContext context) { + final String solrLocation = context.getProperty(SOLR_LOCATION).getValue(); + final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger(); + final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger(); + + final ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout); + params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost); + + final HttpClient httpClient = HttpClientUtil.createClient(params); + if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { - return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue()); + return new HttpSolrClient(solrLocation, httpClient); } else { - CloudSolrClient cloudSolrClient = new CloudSolrClient( - context.getProperty(SOLR_LOCATION).getValue()); - cloudSolrClient.setDefaultCollection( - context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue()); + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue(); + final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + + CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient); + cloudSolrClient.setDefaultCollection(collection); + cloudSolrClient.setZkClientTimeout(zkClientTimeout); + cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout); return cloudSolrClient; } } diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index 336b28721b..c309d78853 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -101,7 +101,7 @@ public class TestPutSolrContentStream { try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); - runner.run(); + runner.run(1, false); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); @@ -133,7 +133,7 @@ public class TestPutSolrContentStream { try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { runner.enqueue(fileIn); - runner.run(); + runner.run(1, false); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); @@ -159,7 +159,7 @@ public class TestPutSolrContentStream { try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); - runner.run(); + runner.run(1, false); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); @@ -185,7 +185,7 @@ public class TestPutSolrContentStream { try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); - runner.run(); + runner.run(1, false); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); @@ -225,7 +225,7 @@ public class TestPutSolrContentStream { // run the processor with a delete-by-query command runner.enqueue("first:bob".getBytes("UTF-8")); - runner.run(); + runner.run(1, false); // prove the document got deleted qResponse = solrClient.query(query); @@ -345,6 +345,7 @@ public class TestPutSolrContentStream { runner.assertValid(); } + @Test public void testSolrTypeStandardShouldNotRequireCollection() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);