NIFI-4583 Restructure nifi-solr-processors

Make all methods static

NIFI-4583 Restructure nifi-solr-processors R2

This closes #2285.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
U-WOODMARK\johannes.peter 2017-11-21 16:38:18 +01:00 committed by Koji Kawamura
parent 84cecfbeea
commit 9f919b9b65
7 changed files with 431 additions and 322 deletions

View File

@ -132,6 +132,17 @@
<version>${solr.version}</version> <version>${solr.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xmlunit</groupId>
<artifactId>xmlunit-matchers</artifactId>
<version>2.2.1</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -20,7 +20,6 @@ package org.apache.nifi.processors.solr;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
@ -29,7 +28,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -37,7 +35,6 @@ import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
@ -62,11 +59,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
@ -75,12 +67,24 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CursorMarkParams; import org.apache.solr.common.params.CursorMarkParams;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @Tags({"Apache", "Solr", "Get", "Pull", "Records"})
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") @CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@ -367,13 +371,13 @@ public class GetSolr extends SolrProcessor {
doc.removeFields(dateField); doc.removeFields(dateField);
} }
} }
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); flowFile = session.write(flowFile, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
} else { } else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(null, null); final RecordSchema schema = writerFactory.getSchema(null, null);
final RecordSet recordSet = solrDocumentsToRecordSet(response.getResults(), schema); final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer(); final StringBuffer mimeType = new StringBuffer();
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override @Override
@ -408,57 +412,6 @@ public class GetSolr extends SolrProcessor {
} }
} }
/**
* Writes each SolrDocument to a record.
*/
private RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
final List<Record> lr = new ArrayList<Record>();
for (SolrDocument doc : docs) {
final Map<String, Object> recordValues = new LinkedHashMap<>();
for (RecordField field : schema.getFields()){
final Object fieldValue = doc.getFieldValue(field.getFieldName());
if (fieldValue != null) {
if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
recordValues.put(field.getFieldName(), ((List<Object>) fieldValue).toArray());
} else {
recordValues.put(field.getFieldName(), fieldValue);
}
}
}
lr.add(new MapRecord(schema, recordValues));
}
return new ListRecordSet(schema, lr);
}
/**
* Writes each SolrDocument in XML format to the OutputStream.
*/
private class QueryResponseOutputStreamCallback implements OutputStreamCallback {
private QueryResponse response;
public QueryResponseOutputStreamCallback(QueryResponse response) {
this.response = response;
}
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
for (SolrDocument doc : response.getResults()) {
final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
IOUtils.write(xml, out, StandardCharsets.UTF_8);
}
IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
}
public SolrInputDocument toSolrInputDocument(SolrDocument d) {
final SolrInputDocument doc = new SolrInputDocument();
for (String name : d.getFieldNames()) {
doc.addField(name, d.getFieldValue(name));
}
return doc;
}
}
} }

View File

@ -57,6 +57,21 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
@Tags({"Apache", "Solr", "Put", "Send"}) @Tags({"Apache", "Solr", "Put", "Send"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")

View File

@ -19,34 +19,31 @@
package org.apache.nifi.processors.solr; package org.apache.nifi.processors.solr;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.solr.common.params.ModifiableSolrParams;
import javax.net.ssl.SSLContext;
import javax.security.auth.login.Configuration; import javax.security.auth.login.Configuration;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_STANDARD;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
/** /**
* A base class for processors that interact with Apache Solr. * A base class for processors that interact with Apache Solr.
@ -54,121 +51,6 @@ import java.util.concurrent.TimeUnit;
*/ */
public abstract class SolrProcessor extends AbstractProcessor { public abstract class SolrProcessor extends AbstractProcessor {
public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
"Cloud", "Cloud", "A SolrCloud instance.");
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
.Builder().name("Solr Location")
.description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
.Builder().name("Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor
.Builder().name("JAAS Client App Name")
.description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " +
"not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " +
"system property java.security.auth.login.config.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor
.Builder().name("Username")
.description("The username to use when Solr is configured with basic authentication.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor
.Builder().name("Password")
.description("The password to use when Solr is configured with basic authentication.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.sensitive(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Socket Timeout")
.description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Connection Timeout")
.description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor
.Builder().name("Solr Maximum Connections")
.description("The maximum number of total connections allowed from the Solr client to Solr.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor
.Builder().name("Solr Maximum Connections Per Host")
.description("The maximum number of connections allowed from the Solr client to a single Solr host.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Client Timeout")
.description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Connection Timeout")
.description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
private volatile SolrClient solrClient; private volatile SolrClient solrClient;
private volatile String solrLocation; private volatile String solrLocation;
private volatile String basicUsername; private volatile String basicUsername;
@ -205,47 +87,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
* @return an HttpSolrClient or CloudSolrClient * @return an HttpSolrClient or CloudSolrClient
*/ */
protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) { protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) {
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); return SolrUtils.createSolrClient(context, solrLocation);
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue();
final ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
// has to happen before the client is created below so that correct configurer would be set if neeeded
if (!StringUtils.isEmpty(jaasClientAppName)) {
System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName);
HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
}
final HttpClient httpClient = HttpClientUtil.createClient(params);
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory);
httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
}
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(solrLocation, httpClient);
} else {
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient);
cloudSolrClient.setDefaultCollection(collection);
cloudSolrClient.setZkClientTimeout(zkClientTimeout);
cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
return cloudSolrClient;
}
} }
/** /**

View File

@ -0,0 +1,284 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class SolrUtils {
public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
"Cloud", "Cloud", "A SolrCloud instance.");
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
.Builder().name("Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
.Builder().name("Solr Location")
.description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor
.Builder().name("Username")
.description("The username to use when Solr is configured with basic authentication.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor
.Builder().name("Password")
.description("The password to use when Solr is configured with basic authentication.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.sensitive(true)
.build();
public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor
.Builder().name("JAAS Client App Name")
.description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " +
"not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " +
"system property java.security.auth.login.config.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Socket Timeout")
.description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("Solr Connection Timeout")
.description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor
.Builder().name("Solr Maximum Connections")
.description("The maximum number of total connections allowed from the Solr client to Solr.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor
.Builder().name("Solr Maximum Connections Per Host")
.description("The maximum number of connections allowed from the Solr client to a single Solr host.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Client Timeout")
.description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor
.Builder().name("ZooKeeper Connection Timeout")
.description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
.defaultValue("10 seconds")
.build();
public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue();
final ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
// has to happen before the client is created below so that correct configurer would be set if neeeded
if (!StringUtils.isEmpty(jaasClientAppName)) {
System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName);
HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
}
final HttpClient httpClient = HttpClientUtil.createClient(params);
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory);
httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
}
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(solrLocation, httpClient);
} else {
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient);
cloudSolrClient.setDefaultCollection(collection);
cloudSolrClient.setZkClientTimeout(zkClientTimeout);
cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
return cloudSolrClient;
}
}
/**
* Writes each SolrDocument to a record.
*/
public static RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
final List<Record> lr = new ArrayList<Record>();
for (SolrDocument doc : docs) {
final Map<String, Object> recordValues = new LinkedHashMap<>();
for (RecordField field : schema.getFields()){
final Object fieldValue = doc.getFieldValue(field.getFieldName());
if (fieldValue != null) {
if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
recordValues.put(field.getFieldName(), ((List<Object>) fieldValue).toArray());
} else {
recordValues.put(field.getFieldName(), fieldValue);
}
}
}
lr.add(new MapRecord(schema, recordValues));
}
return new ListRecordSet(schema, lr);
}
public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
return new QueryResponseOutputStreamCallback(response);
}
/**
* Writes each SolrDocument in XML format to the OutputStream.
*/
private static class QueryResponseOutputStreamCallback implements OutputStreamCallback {
private QueryResponse response;
public QueryResponseOutputStreamCallback(QueryResponse response) {
this.response = response;
}
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
for (SolrDocument doc : response.getResults()) {
final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
IOUtils.write(xml, out, StandardCharsets.UTF_8);
}
IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
}
public SolrInputDocument toSolrInputDocument(SolrDocument d) {
final SolrInputDocument doc = new SolrInputDocument();
for (String name : d.getFieldNames()) {
doc.addField(name, d.getFieldValue(name));
}
return doc;
}
}
}

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.nifi.processors.solr; package org.apache.nifi.processors.solr;
import com.google.gson.stream.JsonReader;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -34,8 +35,11 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xmlunit.matchers.CompareMatcher;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -43,6 +47,9 @@ import java.util.Date;
import java.util.Locale; import java.util.Locale;
import java.util.TimeZone; import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class TestGetSolr { public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection"; static final String DEFAULT_SOLR_CORE = "testCollection";
@ -91,18 +98,23 @@ public class TestGetSolr {
} }
} }
private static TestRunner createDefaultTestRunner(GetSolr processor) {
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(SolrUtils.COLLECTION, "testCollection");
return runner;
}
@Test @Test
public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "20"); runner.setProperty(GetSolr.BATCH_SIZE, "20");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
@ -112,15 +124,9 @@ public class TestGetSolr {
public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000"); runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
@ -135,12 +141,8 @@ public class TestGetSolr {
public void testValidation() throws IOException, SolrServerException { public void testValidation() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "2"); runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue()); runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue());
runner.run(1); runner.run(1);
@ -150,14 +152,8 @@ public class TestGetSolr {
public void testCompletenessDespiteUpdates() throws IOException, SolrServerException { public void testCompletenessDespiteUpdates() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true); runner.run(1,false, true);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
@ -185,14 +181,8 @@ public class TestGetSolr {
public void testCompletenessDespiteDeletions() throws IOException, SolrServerException { public void testCompletenessDespiteDeletions() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true); runner.run(1,false, true);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
@ -227,15 +217,9 @@ public class TestGetSolr {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter)); runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter));
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
SolrInputDocument doc10 = new SolrInputDocument(); SolrInputDocument doc10 = new SolrInputDocument();
doc10.addField("id", "doc10"); doc10.addField("id", "doc10");
@ -258,14 +242,8 @@ public class TestGetSolr {
public void testPropertyModified() throws IOException, SolrServerException { public void testPropertyModified() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true); runner.run(1,false, true);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
@ -291,14 +269,8 @@ public class TestGetSolr {
public void testStateCleared() throws IOException, SolrServerException { public void testStateCleared() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true); runner.run(1,false, true);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
@ -317,20 +289,16 @@ public class TestGetSolr {
runner.assertQueueEmpty(); runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState(); runner.clearTransferState();
} }
@Test @Test
public void testRecordWriter() throws IOException, SolrServerException, InitializationException { public void testRecordWriter() throws IOException, SolrServerException, InitializationException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(GetSolr.RETURN_FIELDS, "id,created,integer_single");
runner.setProperty(GetSolr.DATE_FIELD, "created"); runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
@ -345,10 +313,46 @@ public class TestGetSolr {
runner.run(1,true, true); runner.run(1,true, true);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 5); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
// Check for valid json
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0)))));
reader.beginArray();
int controlScore = 0;
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("integer_single"))
controlScore += reader.nextInt();
else
reader.skipValue();
}
reader.endObject();
}
assertEquals(controlScore, 45);
} }
@Test
public void testForValidXml() throws IOException, SolrServerException, InitializationException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.SOLR_QUERY, "id:doc1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run(1,true, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0)))));
}
// Override createSolrClient and return the passed in SolrClient // Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends GetSolr { private class TestableProcessor extends GetSolr {
private SolrClient solrClient; private SolrClient solrClient;

View File

@ -94,8 +94,8 @@ public class TestPutSolrContentStream {
*/ */
private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) { private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) {
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
return runner; return runner;
} }
@ -250,9 +250,9 @@ public class TestPutSolrContentStream {
final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection); final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection);
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "localhost:9983"); runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:9983");
runner.setProperty(PutSolrContentStream.COLLECTION, "${solr.collection}"); runner.setProperty(SolrUtils.COLLECTION, "${solr.collection}");
final Map<String,String> attributes = new HashMap<>(); final Map<String,String> attributes = new HashMap<>();
attributes.put("solr.collection", collection); attributes.put("solr.collection", collection);
@ -349,11 +349,11 @@ public class TestPutSolrContentStream {
@Test @Test
public void testSolrTypeCloudShouldRequireCollection() { public void testSolrTypeCloudShouldRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertNotValid(); runner.assertNotValid();
runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1"); runner.setProperty(SolrUtils.COLLECTION, "someCollection1");
runner.assertValid(); runner.assertValid();
} }
@ -361,64 +361,64 @@ public class TestPutSolrContentStream {
@Test @Test
public void testSolrTypeStandardShouldNotRequireCollection() { public void testSolrTypeStandardShouldNotRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid(); runner.assertValid();
} }
@Test @Test
public void testHttpsUrlShouldRequireSSLContext() throws InitializationException { public void testHttpsUrlShouldRequireSSLContext() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "https://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "https://localhost:8443/solr");
runner.assertNotValid(); runner.assertNotValid();
final SSLContextService sslContextService = new MockSSLContextService(); final SSLContextService sslContextService = new MockSSLContextService();
runner.addControllerService("ssl-context", sslContextService); runner.addControllerService("ssl-context", sslContextService);
runner.enableControllerService(sslContextService); runner.enableControllerService(sslContextService);
runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
runner.assertValid(); runner.assertValid();
} }
@Test @Test
public void testHttpUrlShouldNotAllowSSLContext() throws InitializationException { public void testHttpUrlShouldNotAllowSSLContext() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid(); runner.assertValid();
final SSLContextService sslContextService = new MockSSLContextService(); final SSLContextService sslContextService = new MockSSLContextService();
runner.addControllerService("ssl-context", sslContextService); runner.addControllerService("ssl-context", sslContextService);
runner.enableControllerService(sslContextService); runner.enableControllerService(sslContextService);
runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
runner.assertNotValid(); runner.assertNotValid();
} }
@Test @Test
public void testUsernamePasswordValidation() { public void testUsernamePasswordValidation() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid(); runner.assertValid();
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "user1"); runner.setProperty(SolrUtils.BASIC_USERNAME, "user1");
runner.assertNotValid(); runner.assertNotValid();
runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "password"); runner.setProperty(SolrUtils.BASIC_PASSWORD, "password");
runner.assertValid(); runner.assertValid();
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, ""); runner.setProperty(SolrUtils.BASIC_USERNAME, "");
runner.assertNotValid(); runner.assertNotValid();
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "${solr.user}"); runner.setProperty(SolrUtils.BASIC_USERNAME, "${solr.user}");
runner.assertNotValid(); runner.assertNotValid();
runner.setVariable("solr.user", "solrRocks"); runner.setVariable("solr.user", "solrRocks");
runner.assertValid(); runner.assertValid();
runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "${solr.password}"); runner.setProperty(SolrUtils.BASIC_PASSWORD, "${solr.password}");
runner.assertNotValid(); runner.assertNotValid();
runner.setVariable("solr.password", "solrRocksPassword"); runner.setVariable("solr.password", "solrRocksPassword");
@ -428,8 +428,8 @@ public class TestPutSolrContentStream {
@Test @Test
public void testJAASClientAppNameValidation() { public void testJAASClientAppNameValidation() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid(); runner.assertValid();
// clear the jaas config system property if it was set // clear the jaas config system property if it was set
@ -439,7 +439,7 @@ public class TestPutSolrContentStream {
} }
// should be invalid if we have a client name but not config file // should be invalid if we have a client name but not config file
runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "Client"); runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "Client");
runner.assertNotValid(); runner.assertNotValid();
// should be invalid if we have a client name that is not in the config file // should be invalid if we have a client name that is not in the config file
@ -448,7 +448,7 @@ public class TestPutSolrContentStream {
runner.assertNotValid(); runner.assertNotValid();
// should be valid now that the name matches up with the config file // should be valid now that the name matches up with the config file
runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "SolrJClient"); runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "SolrJClient");
runner.assertValid(); runner.assertValid();
} }