From 9f919b9b650bbec647d1d5acbc1a63b721ce84e3 Mon Sep 17 00:00:00 2001 From: "U-WOODMARK\\johannes.peter" Date: Tue, 21 Nov 2017 16:38:18 +0100 Subject: [PATCH] NIFI-4583 Restructure nifi-solr-processors Make all methods static NIFI-4583 Restructure nifi-solr-processors R2 This closes #2285. Signed-off-by: Koji Kawamura --- .../nifi-solr-processors/pom.xml | 11 + .../apache/nifi/processors/solr/GetSolr.java | 79 +---- .../processors/solr/PutSolrContentStream.java | 15 + .../nifi/processors/solr/SolrProcessor.java | 180 +---------- .../nifi/processors/solr/SolrUtils.java | 284 ++++++++++++++++++ .../nifi/processors/solr/TestGetSolr.java | 130 ++++---- .../solr/TestPutSolrContentStream.java | 54 ++-- 7 files changed, 431 insertions(+), 322 deletions(-) mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml old mode 100644 new mode 100755 index f35927ef4d..920ce73369 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -132,6 +132,17 @@ ${solr.version} test + + com.google.code.gson + gson + test + + + org.xmlunit + xmlunit-matchers + 2.2.1 + test + diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index 1871f1c8f2..3435ce446e 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -20,7 +20,6 @@ package org.apache.nifi.processors.solr; import java.io.IOException; import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -29,7 +28,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -37,7 +35,6 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -62,11 +59,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.record.ListRecordSet; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.StringUtils; @@ -75,12 +67,24 @@ import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.CursorMarkParams; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; + @Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") @@ -367,13 +371,13 @@ public class GetSolr extends SolrProcessor { doc.removeFields(dateField); } } - flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); + flowFile = session.write(flowFile, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response)); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); } else { final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSchema schema = writerFactory.getSchema(null, null); - final RecordSet recordSet = solrDocumentsToRecordSet(response.getResults(), schema); + final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); final StringBuffer mimeType = new StringBuffer(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override @@ -408,57 +412,6 @@ public class GetSolr extends SolrProcessor { } } - /** - * Writes each SolrDocument to a record. - */ - private RecordSet solrDocumentsToRecordSet(final List docs, final RecordSchema schema) { - final List lr = new ArrayList(); - for (SolrDocument doc : docs) { - final Map recordValues = new LinkedHashMap<>(); - for (RecordField field : schema.getFields()){ - final Object fieldValue = doc.getFieldValue(field.getFieldName()); - if (fieldValue != null) { - if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){ - recordValues.put(field.getFieldName(), ((List) fieldValue).toArray()); - } else { - recordValues.put(field.getFieldName(), fieldValue); - } - } - } - lr.add(new MapRecord(schema, recordValues)); - } - return new ListRecordSet(schema, lr); - } - /** - * Writes each SolrDocument in XML format to the OutputStream. - */ - private class QueryResponseOutputStreamCallback implements OutputStreamCallback { - private QueryResponse response; - - public QueryResponseOutputStreamCallback(QueryResponse response) { - this.response = response; - } - - @Override - public void process(OutputStream out) throws IOException { - IOUtils.write("", out, StandardCharsets.UTF_8); - for (SolrDocument doc : response.getResults()) { - final String xml = ClientUtils.toXML(toSolrInputDocument(doc)); - IOUtils.write(xml, out, StandardCharsets.UTF_8); - } - IOUtils.write("", out, StandardCharsets.UTF_8); - } - - public SolrInputDocument toSolrInputDocument(SolrDocument d) { - final SolrInputDocument doc = new SolrInputDocument(); - - for (String name : d.getFieldNames()) { - doc.addField(name, d.getFieldValue(name)); - } - - return doc; - } - } } diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java old mode 100644 new mode 100755 index dc1830cd93..69bc071e99 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -57,6 +57,21 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; + @Tags({"Apache", "Solr", "Put", "Send"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java old mode 100644 new mode 100755 index cd29f7cd25..7c732e946f --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -19,34 +19,31 @@ package org.apache.nifi.processors.solr; import org.apache.commons.lang3.StringUtils; -import org.apache.http.client.HttpClient; -import org.apache.http.conn.scheme.Scheme; -import org.apache.http.conn.ssl.SSLSocketFactory; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; -import org.apache.solr.common.params.ModifiableSolrParams; -import javax.net.ssl.SSLContext; import javax.security.auth.login.Configuration; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_STANDARD; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; /** * A base class for processors that interact with Apache Solr. @@ -54,121 +51,6 @@ import java.util.concurrent.TimeUnit; */ public abstract class SolrProcessor extends AbstractProcessor { - public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue( - "Cloud", "Cloud", "A SolrCloud instance."); - - public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( - "Standard", "Standard", "A stand-alone Solr instance."); - - public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor - .Builder().name("Solr Type") - .description("The type of Solr instance, Cloud or Standard.") - .required(true) - .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD) - .defaultValue(SOLR_TYPE_STANDARD.getValue()) - .build(); - - public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor - .Builder().name("Solr Location") - .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " + - "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .expressionLanguageSupported(true) - .build(); - - public static final PropertyDescriptor COLLECTION = new PropertyDescriptor - .Builder().name("Collection") - .description("The Solr collection name, only used with a Solr Type of Cloud") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - - public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor - .Builder().name("JAAS Client App Name") - .description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " + - "not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " + - "system property java.security.auth.login.config.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor - .Builder().name("Username") - .description("The username to use when Solr is configured with basic authentication.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .expressionLanguageSupported(true) - .build(); - - public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor - .Builder().name("Password") - .description("The password to use when Solr is configured with basic authentication.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .expressionLanguageSupported(true) - .sensitive(true) - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor - .Builder().name("Solr Socket Timeout") - .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("10 seconds") - .build(); - - public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor - .Builder().name("Solr Connection Timeout") - .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("10 seconds") - .build(); - - public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor - .Builder().name("Solr Maximum Connections") - .description("The maximum number of total connections allowed from the Solr client to Solr.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10") - .build(); - - public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor - .Builder().name("Solr Maximum Connections Per Host") - .description("The maximum number of connections allowed from the Solr client to a single Solr host.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("5") - .build(); - - public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor - .Builder().name("ZooKeeper Client Timeout") - .description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.") - .required(false) - .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) - .defaultValue("10 seconds") - .build(); - - public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor - .Builder().name("ZooKeeper Connection Timeout") - .description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.") - .required(false) - .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) - .defaultValue("10 seconds") - .build(); - private volatile SolrClient solrClient; private volatile String solrLocation; private volatile String basicUsername; @@ -205,47 +87,7 @@ public abstract class SolrProcessor extends AbstractProcessor { * @return an HttpSolrClient or CloudSolrClient */ protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) { - final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger(); - final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger(); - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue(); - - final ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout); - params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout); - params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections); - params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost); - - // has to happen before the client is created below so that correct configurer would be set if neeeded - if (!StringUtils.isEmpty(jaasClientAppName)) { - System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName); - HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); - } - - final HttpClient httpClient = HttpClientUtil.createClient(params); - - if (sslContextService != null) { - final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); - final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext); - final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory); - httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); - } - - if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { - return new HttpSolrClient(solrLocation, httpClient); - } else { - final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue(); - final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - - CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient); - cloudSolrClient.setDefaultCollection(collection); - cloudSolrClient.setZkClientTimeout(zkClientTimeout); - cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout); - return cloudSolrClient; - } + return SolrUtils.createSolrClient(context, solrLocation); } /** diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java new file mode 100755 index 0000000000..8977b7ca98 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class SolrUtils { + + public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue( + "Cloud", "Cloud", "A SolrCloud instance."); + + public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( + "Standard", "Standard", "A stand-alone Solr instance."); + + public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor + .Builder().name("Solr Type") + .description("The type of Solr instance, Cloud or Standard.") + .required(true) + .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD) + .defaultValue(SOLR_TYPE_STANDARD.getValue()) + .build(); + + public static final PropertyDescriptor COLLECTION = new PropertyDescriptor + .Builder().name("Collection") + .description("The Solr collection name, only used with a Solr Type of Cloud") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor + .Builder().name("Solr Location") + .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " + + "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor + .Builder().name("Username") + .description("The username to use when Solr is configured with basic authentication.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor + .Builder().name("Password") + .description("The password to use when Solr is configured with basic authentication.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(true) + .sensitive(true) + .build(); + + public static final PropertyDescriptor JAAS_CLIENT_APP_NAME = new PropertyDescriptor + .Builder().name("JAAS Client App Name") + .description("The name of the JAAS configuration entry to use when performing Kerberos authentication to Solr. If this property is " + + "not provided, Kerberos authentication will not be attempted. The value must match an entry in the file specified by the " + + "system property java.security.auth.login.config.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor + .Builder().name("Solr Socket Timeout") + .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor + .Builder().name("Solr Connection Timeout") + .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor SOLR_MAX_CONNECTIONS = new PropertyDescriptor + .Builder().name("Solr Maximum Connections") + .description("The maximum number of total connections allowed from the Solr client to Solr.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor + .Builder().name("Solr Maximum Connections Per Host") + .description("The maximum number of connections allowed from the Solr client to a single Solr host.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .build(); + + public static final PropertyDescriptor ZK_CLIENT_TIMEOUT = new PropertyDescriptor + .Builder().name("ZooKeeper Client Timeout") + .description("The amount of time to wait for data on a connection to ZooKeeper, only used with a Solr Type of Cloud.") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("10 seconds") + .build(); + + public static final PropertyDescriptor ZK_CONNECTION_TIMEOUT = new PropertyDescriptor + .Builder().name("ZooKeeper Connection Timeout") + .description("The amount of time to wait when establishing a connection to ZooKeeper, only used with a Solr Type of Cloud.") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("10 seconds") + .build(); + + public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) { + final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger(); + final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String jaasClientAppName = context.getProperty(JAAS_CLIENT_APP_NAME).getValue(); + + final ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout); + params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost); + + // has to happen before the client is created below so that correct configurer would be set if neeeded + if (!StringUtils.isEmpty(jaasClientAppName)) { + System.setProperty("solr.kerberos.jaas.appname", jaasClientAppName); + HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + } + + final HttpClient httpClient = HttpClientUtil.createClient(params); + + if (sslContextService != null) { + final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext); + final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory); + httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); + } + + if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { + return new HttpSolrClient(solrLocation, httpClient); + } else { + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue(); + final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + + CloudSolrClient cloudSolrClient = new CloudSolrClient(solrLocation, httpClient); + cloudSolrClient.setDefaultCollection(collection); + cloudSolrClient.setZkClientTimeout(zkClientTimeout); + cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout); + return cloudSolrClient; + } + } + + + + /** + * Writes each SolrDocument to a record. + */ + public static RecordSet solrDocumentsToRecordSet(final List docs, final RecordSchema schema) { + final List lr = new ArrayList(); + + for (SolrDocument doc : docs) { + final Map recordValues = new LinkedHashMap<>(); + for (RecordField field : schema.getFields()){ + final Object fieldValue = doc.getFieldValue(field.getFieldName()); + if (fieldValue != null) { + if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){ + recordValues.put(field.getFieldName(), ((List) fieldValue).toArray()); + } else { + recordValues.put(field.getFieldName(), fieldValue); + } + } + } + lr.add(new MapRecord(schema, recordValues)); + } + return new ListRecordSet(schema, lr); + } + + + public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) { + return new QueryResponseOutputStreamCallback(response); + } + + /** + * Writes each SolrDocument in XML format to the OutputStream. + */ + private static class QueryResponseOutputStreamCallback implements OutputStreamCallback { + private QueryResponse response; + + public QueryResponseOutputStreamCallback(QueryResponse response) { + this.response = response; + } + + @Override + public void process(OutputStream out) throws IOException { + IOUtils.write("", out, StandardCharsets.UTF_8); + for (SolrDocument doc : response.getResults()) { + final String xml = ClientUtils.toXML(toSolrInputDocument(doc)); + IOUtils.write(xml, out, StandardCharsets.UTF_8); + } + IOUtils.write("", out, StandardCharsets.UTF_8); + } + + public SolrInputDocument toSolrInputDocument(SolrDocument d) { + final SolrInputDocument doc = new SolrInputDocument(); + + for (String name : d.getFieldNames()) { + doc.addField(name, d.getFieldValue(name)); + } + + return doc; + } + } + + +} diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java index cb3d9c53eb..af5f3dd41e 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -18,6 +18,7 @@ */ package org.apache.nifi.processors.solr; +import com.google.gson.stream.JsonReader; import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -34,8 +35,11 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; import java.text.SimpleDateFormat; @@ -43,6 +47,9 @@ import java.util.Date; import java.util.Locale; import java.util.TimeZone; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + public class TestGetSolr { static final String DEFAULT_SOLR_CORE = "testCollection"; @@ -91,18 +98,23 @@ public class TestGetSolr { } } + private static TestRunner createDefaultTestRunner(GetSolr processor) { + TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(SolrUtils.COLLECTION, "testCollection"); + return runner; + } + @Test public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "20"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); @@ -112,15 +124,9 @@ public class TestGetSolr { public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); @@ -135,12 +141,8 @@ public class TestGetSolr { public void testValidation() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "2"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue()); runner.run(1); @@ -150,14 +152,8 @@ public class TestGetSolr { public void testCompletenessDespiteUpdates() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(1,false, true); runner.assertQueueEmpty(); @@ -185,14 +181,8 @@ public class TestGetSolr { public void testCompletenessDespiteDeletions() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(1,false, true); runner.assertQueueEmpty(); @@ -227,15 +217,9 @@ public class TestGetSolr { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter)); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); SolrInputDocument doc10 = new SolrInputDocument(); doc10.addField("id", "doc10"); @@ -258,14 +242,8 @@ public class TestGetSolr { public void testPropertyModified() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(1,false, true); runner.assertQueueEmpty(); @@ -291,14 +269,8 @@ public class TestGetSolr { public void testStateCleared() throws IOException, SolrServerException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); + TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(GetSolr.BATCH_SIZE, "1"); - runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(1,false, true); runner.assertQueueEmpty(); @@ -317,20 +289,16 @@ public class TestGetSolr { runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); runner.clearTransferState(); - - } @Test public void testRecordWriter() throws IOException, SolrServerException, InitializationException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.DATE_FIELD, "created"); - runner.setProperty(GetSolr.BATCH_SIZE, "2"); - runner.setProperty(GetSolr.COLLECTION, "testCollection"); + TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue()); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created,integer_single"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); @@ -345,10 +313,46 @@ public class TestGetSolr { runner.run(1,true, true); runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 5); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); + + // Check for valid json + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0))))); + reader.beginArray(); + int controlScore = 0; + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("integer_single")) + controlScore += reader.nextInt(); + else + reader.skipValue(); + } + reader.endObject(); + } + assertEquals(controlScore, 45); } + @Test + public void testForValidXml() throws IOException, SolrServerException, InitializationException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); + + TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(GetSolr.SOLR_QUERY, "id:doc1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); + + runner.run(1,true, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); + + String expectedXml = "doc1"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GetSolr.REL_SUCCESS).get(0))))); + } + + // Override createSolrClient and return the passed in SolrClient private class TestableProcessor extends GetSolr { private SolrClient solrClient; diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java old mode 100644 new mode 100755 index 18948c6f5e..f4eeac27b1 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -94,8 +94,8 @@ public class TestPutSolrContentStream { */ private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) { TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); return runner; } @@ -250,9 +250,9 @@ public class TestPutSolrContentStream { final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection); final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "localhost:9983"); - runner.setProperty(PutSolrContentStream.COLLECTION, "${solr.collection}"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:9983"); + runner.setProperty(SolrUtils.COLLECTION, "${solr.collection}"); final Map attributes = new HashMap<>(); attributes.put("solr.collection", collection); @@ -349,11 +349,11 @@ public class TestPutSolrContentStream { @Test public void testSolrTypeCloudShouldRequireCollection() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertNotValid(); - runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1"); + runner.setProperty(SolrUtils.COLLECTION, "someCollection1"); runner.assertValid(); } @@ -361,64 +361,64 @@ public class TestPutSolrContentStream { @Test public void testSolrTypeStandardShouldNotRequireCollection() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertValid(); } @Test public void testHttpsUrlShouldRequireSSLContext() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "https://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "https://localhost:8443/solr"); runner.assertNotValid(); final SSLContextService sslContextService = new MockSSLContextService(); runner.addControllerService("ssl-context", sslContextService); runner.enableControllerService(sslContextService); - runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context"); runner.assertValid(); } @Test public void testHttpUrlShouldNotAllowSSLContext() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertValid(); final SSLContextService sslContextService = new MockSSLContextService(); runner.addControllerService("ssl-context", sslContextService); runner.enableControllerService(sslContextService); - runner.setProperty(PutSolrContentStream.SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context"); runner.assertNotValid(); } @Test public void testUsernamePasswordValidation() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertValid(); - runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "user1"); + runner.setProperty(SolrUtils.BASIC_USERNAME, "user1"); runner.assertNotValid(); - runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "password"); + runner.setProperty(SolrUtils.BASIC_PASSWORD, "password"); runner.assertValid(); - runner.setProperty(PutSolrContentStream.BASIC_USERNAME, ""); + runner.setProperty(SolrUtils.BASIC_USERNAME, ""); runner.assertNotValid(); - runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "${solr.user}"); + runner.setProperty(SolrUtils.BASIC_USERNAME, "${solr.user}"); runner.assertNotValid(); runner.setVariable("solr.user", "solrRocks"); runner.assertValid(); - runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "${solr.password}"); + runner.setProperty(SolrUtils.BASIC_PASSWORD, "${solr.password}"); runner.assertNotValid(); runner.setVariable("solr.password", "solrRocksPassword"); @@ -428,8 +428,8 @@ public class TestPutSolrContentStream { @Test public void testJAASClientAppNameValidation() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertValid(); // clear the jaas config system property if it was set @@ -439,7 +439,7 @@ public class TestPutSolrContentStream { } // should be invalid if we have a client name but not config file - runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "Client"); + runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "Client"); runner.assertNotValid(); // should be invalid if we have a client name that is not in the config file @@ -448,7 +448,7 @@ public class TestPutSolrContentStream { runner.assertNotValid(); // should be valid now that the name matches up with the config file - runner.setProperty(PutSolrContentStream.JAAS_CLIENT_APP_NAME, "SolrJClient"); + runner.setProperty(SolrUtils.JAAS_CLIENT_APP_NAME, "SolrJClient"); runner.assertValid(); }