NIFI-1417 Exposing several connection settings on the Solr processors to help deal with timeouts and adding an @OnStopped method to close the SolrClient which didn't appear to be happening before

This closes #194
This commit is contained in:
Bryan Bende 2016-01-29 11:12:30 -05:00
parent ef80549d63
commit 1e56de9521
4 changed files with 105 additions and 10 deletions

View File

@ -139,6 +139,12 @@ public class GetSolr extends SolrProcessor {
descriptors.add(SORT_CLAUSE); descriptors.add(SORT_CLAUSE);
descriptors.add(DATE_FIELD); descriptors.add(DATE_FIELD);
descriptors.add(BATCH_SIZE); descriptors.add(BATCH_SIZE);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
descriptors.add(ZK_CLIENT_TIMEOUT);
descriptors.add(ZK_CONNECTION_TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors); this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();

View File

@ -123,6 +123,12 @@ public class PutSolrContentStream extends SolrProcessor {
descriptors.add(CONTENT_STREAM_PATH); descriptors.add(CONTENT_STREAM_PATH);
descriptors.add(CONTENT_TYPE); descriptors.add(CONTENT_TYPE);
descriptors.add(COMMIT_WITHIN); descriptors.add(COMMIT_WITHIN);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
descriptors.add(ZK_CLIENT_TIMEOUT);
descriptors.add(ZK_CONNECTION_TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors); this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();

View File

@ -18,7 +18,9 @@
*/ */
package org.apache.nifi.processors.solr; package org.apache.nifi.processors.solr;
import org.apache.http.client.HttpClient;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -28,12 +30,15 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.params.ModifiableSolrParams;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* A base class for processors that interact with Apache Solr. * A base class for processors that interact with Apache Solr.
@ -71,6 +76,54 @@ public abstract class SolrProcessor extends AbstractProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Socket Timeout")
.description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Connection Timeout")
.description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor
.Builder().name("Solr Maximum Connections")
.description("The maximum number of total connections allowed from the Solr client to Solr.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor
.Builder().name("Solr Maximum Connections Per Host")
.description("The maximum number of connections allowed from the Solr client to a single Solr host.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Client Timeout")
.description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Connection Timeout")
.description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
private volatile SolrClient solrClient; private volatile SolrClient solrClient;
@OnScheduled @OnScheduled
@ -78,6 +131,17 @@ public abstract class SolrProcessor extends AbstractProcessor {
this.solrClient = createSolrClient(context); this.solrClient = createSolrClient(context);
} }
@OnStopped
public final void closeClient() {
if (solrClient != null) {
try {
solrClient.close();
} catch (IOException e) {
getLogger().debug("Error closing SolrClient", e);
}
}
}
/** /**
* Create a SolrClient based on the type of Solr specified. * Create a SolrClient based on the type of Solr specified.
* *
@ -86,13 +150,31 @@ public abstract class SolrProcessor extends AbstractProcessor {
* @return an HttpSolrClient or CloudSolrClient * @return an HttpSolrClient or CloudSolrClient
*/ */
protected SolrClient createSolrClient(final ProcessContext context) { protected SolrClient createSolrClient(final ProcessContext context) {
final String solrLocation = context.getProperty(SOLR_LOCATION).getValue();
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
final ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
final HttpClient httpClient = HttpClientUtil.createClient(params);
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue()); return new HttpSolrClient(solrLocation, httpClient);
} else { } else {
CloudSolrClient cloudSolrClient = new CloudSolrClient( final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
context.getProperty(SOLR_LOCATION).getValue()); final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
cloudSolrClient.setDefaultCollection( final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient);
cloudSolrClient.setDefaultCollection(collection);
cloudSolrClient.setZkClientTimeout(zkClientTimeout);
cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
return cloudSolrClient; return cloudSolrClient;
} }
} }

View File

@ -101,7 +101,7 @@ public class TestPutSolrContentStream {
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
runner.run(); runner.run(1, false);
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@ -133,7 +133,7 @@ public class TestPutSolrContentStream {
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
runner.run(); runner.run(1, false);
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@ -159,7 +159,7 @@ public class TestPutSolrContentStream {
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
runner.run(); runner.run(1, false);
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@ -185,7 +185,7 @@ public class TestPutSolrContentStream {
try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) { try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
runner.run(); runner.run(1, false);
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
@ -225,7 +225,7 @@ public class TestPutSolrContentStream {
// run the processor with a delete-by-query command // run the processor with a delete-by-query command
runner.enqueue("<delete><query>first:bob</query></delete>".getBytes("UTF-8")); runner.enqueue("<delete><query>first:bob</query></delete>".getBytes("UTF-8"));
runner.run(); runner.run(1, false);
// prove the document got deleted // prove the document got deleted
qResponse = solrClient.query(query); qResponse = solrClient.query(query);
@ -345,6 +345,7 @@ public class TestPutSolrContentStream {
runner.assertValid(); runner.assertValid();
} }
@Test @Test
public void testSolrTypeStandardShouldNotRequireCollection() { public void testSolrTypeStandardShouldNotRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);