Merge branch 'nifi-solr-bundle' of https://github.com/bbende/incubator-nifi into solr

This commit is contained in:
Mark Payne 2015-05-01 11:59:29 -04:00
commit 3ddfeaf072
27 changed files with 1981 additions and 24 deletions

View File

@ -1,13 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
@ -163,6 +163,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-nar</artifactId>
@ -297,7 +302,7 @@ language governing permissions and limitations under the License. -->
<nifi.security.ocsp.responder.url />
<nifi.security.ocsp.responder.certificate />
<!-- nifi.properties: cluster common properties (cluster manager and nodes
<!-- nifi.properties: cluster common properties (cluster manager and nodes
must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
@ -310,7 +315,7 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.protocol.multicast.service.locator.attempts>3</nifi.cluster.protocol.multicast.service.locator.attempts>
<nifi.cluster.protocol.multicast.service.locator.attempts.delay>1 sec</nifi.cluster.protocol.multicast.service.locator.attempts.delay>
<!-- nifi.properties: cluster node properties (only configure for cluster
<!-- nifi.properties: cluster node properties (only configure for cluster
nodes) -->
<nifi.cluster.is.node>false</nifi.cluster.is.node>
<nifi.cluster.node.address />
@ -319,7 +324,7 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.node.unicast.manager.address />
<nifi.cluster.node.unicast.manager.protocol.port />
<!-- nifi.properties: cluster manager properties (only configure for cluster
<!-- nifi.properties: cluster manager properties (only configure for cluster
manager) -->
<nifi.cluster.is.manager>false</nifi.cluster.is.manager>
<nifi.cluster.manager.address />

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<!-- These Lucene deps should be brought in through solr-core, but there
appears to be an issue with 5.0.0 that still references some 4.10.3 poms -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/solr/solr.xml</exclude>
<exclude>src/test/resources/testCollection/core.properties</exclude>
<exclude>src/test/resources/testCollection/conf/_rest_managed.json</exclude>
<exclude>src/test/resources/testCollection/conf/protowords.txt</exclude>
<exclude>src/test/resources/testCollection/conf/schema.xml</exclude>
<exclude>src/test/resources/testCollection/conf/solrconfig.xml</exclude>
<exclude>src/test/resources/testCollection/conf/synonyms.txt</exclude>
<exclude>src/test/resources/testCollection/conf/lang/stopwords_en.txt</exclude>
<exclude>src/test/resources/testdata/test-csv-multiple-docs.csv</exclude>
<exclude>src/test/resources/testdata/test-custom-json-single-doc.json</exclude>
<exclude>src/test/resources/testdata/test-solr-json-multiple-docs.json</exclude>
<exclude>src/test/resources/testdata/test-xml-multiple-docs.xml</exclude>
<exclude>src/test/resources/log4j.properties</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"Apache", "Solr", "Get", "Pull"})
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
public class GetSolr extends SolrProcessor {
public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
.Builder().name("Solr Query")
.description("A query to execute against Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
.Builder().name("Return Fields")
.description("Comma-separated list of fields names to return")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor
.Builder().name("Sort Clause")
.description("A Solr sort clause, ex: field1 asc, field2 desc")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
.Builder().name("Date Field")
.description("The name of a date field in Solr used to filter results")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size")
.description("Number of rows per Solr query")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("100")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The results of querying Solr")
.build();
static final String FILE_PREFIX = "conf/.getSolr-";
static final String LAST_END_DATE = "LastEndDate";
static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final String UNINITIALIZED_LAST_END_DATE_VALUE;
static {
SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L));
}
final AtomicReference<String> lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE);
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private final Lock fileLock = new ReentrantLock();
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(SOLR_QUERY);
descriptors.add(RETURN_FIELDS);
descriptors.add(SORT_CLAUSE);
descriptors.add(DATE_FIELD);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
}
@OnShutdown
public void onShutdown() {
writeLastEndDate();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
readLastEndDate();
final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String currDate = sdf.format(new Date());
final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
final String query = context.getProperty(SOLR_QUERY).getValue();
final SolrQuery solrQuery = new SolrQuery(query);
solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
// if initialized then apply a filter to restrict results from the last end time til now
if (initialized) {
StringBuilder filterQuery = new StringBuilder();
filterQuery.append(context.getProperty(DATE_FIELD).getValue())
.append(":{").append(lastEndDatedRef.get()).append(" TO ")
.append(currDate).append("]");
solrQuery.addFilterQuery(filterQuery.toString());
logger.info("Applying filter query {}", new Object[]{filterQuery.toString()});
}
final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
if (returnFields != null && !returnFields.trim().isEmpty()) {
for (String returnField : returnFields.trim().split("[,]")) {
solrQuery.addField(returnField.trim());
}
}
final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue();
if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
for (String sortClause : fullSortClause.split("[,]")) {
String[] sortParts = sortClause.trim().split("[ ]");
solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1]));
}
}
try {
// run the initial query and send out the first page of results
final StopWatch stopWatch = new StopWatch(true);
QueryResponse response = getSolrClient().query(solrQuery);
stopWatch.stop();
long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final SolrDocumentList documentList = response.getResults();
logger.info("Retrieved {} results from Solr for {} in {} ms",
new Object[] {documentList.getNumFound(), query, duration});
if (documentList != null && documentList.getNumFound() > 0) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
session.transfer(flowFile, REL_SUCCESS);
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
}
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
// if initialized then page through the results and send out each page
if (initialized) {
int endRow = response.getResults().size();
long totalResults = response.getResults().getNumFound();
while (endRow < totalResults) {
solrQuery.setStart(endRow);
stopWatch.start();
response = getSolrClient().query(solrQuery);
stopWatch.stop();
duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration});
flowFile = session.create();
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
endRow += response.getResults().size();
}
}
}
lastEndDatedRef.set(currDate);
writeLastEndDate();
} catch (SolrServerException | IOException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e);
throw new ProcessException(e);
} catch (final Throwable t) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t);
throw t;
}
}
private void readLastEndDate() {
fileLock.lock();
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
try (FileInputStream fis = new FileInputStream(lastEndDateCache)) {
Properties props = new Properties();
props.load(fis);
lastEndDatedRef.set(props.getProperty(LAST_END_DATE));
} catch (IOException swallow) {
} finally {
fileLock.unlock();
}
}
private void writeLastEndDate() {
fileLock.lock();
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
Properties props = new Properties();
props.setProperty(LAST_END_DATE, lastEndDatedRef.get());
props.store(fos, "GetSolr LastEndDate value");
} catch (IOException e) {
getLogger().error("Failed to persist LastEndDate due to " + e, e);
} finally {
fileLock.unlock();
}
}
/**
* Writes each SolrDocument in XML format to the OutputStream.
*/
private class QueryResponseOutputStreamCallback implements OutputStreamCallback {
private QueryResponse response;
public QueryResponseOutputStreamCallback(QueryResponse response) {
this.response = response;
}
@Override
public void process(OutputStream out) throws IOException {
for (SolrDocument doc : response.getResults()) {
String xml = ClientUtils.toXML(ClientUtils.toSolrInputDocument(doc));
IOUtils.write(xml, out);
}
}
}
}

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@Tags({"Apache", "Solr", "Put", "Send"})
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
public class PutSolrContentStream extends SolrProcessor {
public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor
.Builder().name("Content Stream Path")
.description("The path in Solr to post the ContentStream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("/update/json/docs")
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor
.Builder().name("Content-Type")
.description("Content-Type being sent to Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("application/json")
.build();
public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
.Builder().name("Commit Within")
.description("The number of milliseconds before the given update is committed")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The original FlowFile")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed for any reason other than Solr being unreachable")
.build();
public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
.name("connection_failure")
.description("FlowFiles that failed because Solr is unreachable")
.build();
public static final String COLLECTION_PARAM_NAME = "collection";
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(CONTENT_STREAM_PATH);
descriptors.add(CONTENT_TYPE);
descriptors.add(COMMIT_WITHIN);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_CONNECTION_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final ObjectHolder<Exception> error = new ObjectHolder<>(null);
final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH)
.evaluateAttributeExpressions().getValue();
ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath);
request.setParams(new ModifiableSolrParams());
// add the extra params, don't use 'set' in case of repeating params
Iterator<String> paramNames = requestParams.getParameterNamesIterator();
while (paramNames.hasNext()) {
String paramName = paramNames.next();
for (String paramValue : requestParams.getParams(paramName)) {
request.getParams().add(paramName, paramValue);
}
}
// specify the collection for SolrCloud
if (isSolrCloud) {
request.setParam(COLLECTION_PARAM_NAME, collection);
}
if (commitWithin != null && commitWithin > 0) {
request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
}
try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
// add the FlowFile's content on the UpdateRequest
request.addContentStream(new ContentStreamBase() {
@Override
public InputStream getStream() throws IOException {
return bufferedIn;
}
@Override
public String getContentType() {
return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
}
});
UpdateResponse response = request.process(getSolrClient());
getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
} catch (SolrException e) {
error.set(e);
} catch (SolrServerException e) {
if (causedByIOException(e)) {
connectionError.set(e);
} else {
error.set(e);
}
} catch (IOException e) {
connectionError.set(e);
}
}
});
timer.stop();
if (error.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to failure",
new Object[]{flowFile, error.get()});
session.transfer(flowFile, REL_FAILURE);
} else if (connectionError.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
new Object[]{flowFile, connectionError.get()});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_CONNECTION_FAILURE);
} else {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (isSolrCloud) {
transitUri.append(":").append(collection);
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
session.transfer(flowFile, REL_SUCCESS);
}
}
private boolean causedByIOException(SolrServerException e) {
boolean foundIOException = false;
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof IOException) {
foundIOException = true;
break;
}
cause = cause.getCause();
}
return foundIOException;
}
// get all of the dynamic properties and values into a Map for later adding to the Solr request
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
final Map<String,String[]> paramsMap = new HashMap<>();
final SortedMap<String,String> repeatingParams = new TreeMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
final String paramName = descriptor.getName();
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (!paramValue.trim().isEmpty()) {
if (paramName.matches(REPEATING_PARAM_PATTERN)) {
repeatingParams.put(paramName, paramValue);
} else {
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
}
}
}
}
for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) {
final String paramName = entry.getKey();
final String paramValue = entry.getValue();
final int idx = paramName.lastIndexOf(".");
MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
}
return paramsMap;
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A base class for processors that interact with Apache Solr.
*
*/
public abstract class SolrProcessor extends AbstractProcessor {
public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
"Cloud", "Cloud", "A SolrCloud instance.");
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
.Builder().name("Solr Location")
.description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
.Builder().name("Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
private volatile SolrClient solrClient;
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
this.solrClient = createSolrClient(context);
}
/**
* Create a SolrClient based on the type of Solr specified.
*
* @param context
* The context
* @return an HttpSolrClient or CloudSolrClient
*/
protected SolrClient createSolrClient(final ProcessContext context) {
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
} else {
CloudSolrClient cloudSolrClient = new CloudSolrClient(
context.getProperty(SOLR_LOCATION).getValue());
cloudSolrClient.setDefaultCollection(
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
return cloudSolrClient;
}
}
/**
* Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
* {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} method
*
* @return an HttpSolrClient or CloudSolrClient
*/
protected final SolrClient getSolrClient() {
return solrClient;
}
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>();
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
final String collection = context.getProperty(COLLECTION).getValue();
if (collection == null || collection.trim().isEmpty()) {
problems.add(new ValidationResult.Builder()
.subject(COLLECTION.getName())
.input(collection).valid(false)
.explanation("A collection must specified for Solr Type of Cloud")
.build());
}
}
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
return problems;
}
/**
* Allows additional custom validation to be done. This will be called from
* the parent's customValidation method.
*
* @param context
* The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
return new ArrayList<>();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.solr.PutSolrContentStream
org.apache.nifi.processors.solr.GetSolr

View File

@ -0,0 +1,48 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSolrContentStream</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor streams the contents of a FlowFile to an Apache Solr
update handler. Any properties added to this processor by the user are
passed to Solr on the update request. If a parameter must be sent multiple
times with different values, properties can follow a naming convention:
name.number, where name is the parameter name and number is a unique number.
Repeating parameters will be sorted by their property name.
</p>
<p>
Example: To specify multiple 'f' parameters for indexing custom json, the following
properties can be defined:
</p>
<ul>
<li><strong>split</strong>: /exams</li>
<li><strong>f.1</strong>: first:/first</li>
<li><strong>f.2</strong>: last:/last</li>
<li><strong>f.3</strong>: grade:/grade</li>
</ul>
<p>
This will result in sending the following url to Solr: </br>
split=/exams&f=first:/first&f=last:/last&f=grade:/grade
</p>
</body>
</html>

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* Helper to create EmbeddedSolrServer instances for testing.
*/
public class EmbeddedSolrServerFactory {
public static final String DEFAULT_SOLR_HOME = "src/test/resources/solr";
public static final String DEFAULT_CORE_HOME = "src/test/resources/";
public static final String DEFAULT_DATA_DIR = "target";
/**
* Use the defaults to create the core.
*
* @param coreName the name of the core
* @return an EmbeddedSolrServer for the given core
*/
public static SolrClient create(String coreName) throws IOException {
return create(DEFAULT_SOLR_HOME, DEFAULT_CORE_HOME,
coreName, DEFAULT_DATA_DIR);
}
/**
*
* @param solrHome
* path to directory where solr.xml lives
* @param coreName
* the name of the core to load
* @param dataDir
* the data dir for the core
*
* @return an EmbeddedSolrServer for the given core
*/
public static SolrClient create(String solrHome, String coreHome, String coreName, String dataDir)
throws IOException {
Properties props = new Properties();
if (dataDir != null) {
File coreDataDir = new File(dataDir + "/" + coreName);
if (coreDataDir.exists()) {
FileUtils.deleteDirectory(coreDataDir);
}
props.setProperty("dataDir", dataDir + "/" + coreName);
}
CoreContainer coreContainer = new CoreContainer(solrHome);
coreContainer.load();
CoreDescriptor descriptor = new CoreDescriptor(coreContainer, coreName,
new File(coreHome, coreName).getAbsolutePath(), props);
coreContainer.create(descriptor);
return new EmbeddedSolrServer(coreContainer, coreName);
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
import java.util.Properties;
import java.util.TimeZone;
import static org.junit.Assert.assertTrue;
public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection";
private SolrClient solrClient;
@Before
public void setup() {
// create the conf dir if it doesn't exist
File confDir = new File("conf");
if (!confDir.exists()) {
confDir.mkdir();
}
try {
// create an EmbeddedSolrServer for the processor to use
String relPath = getClass().getProtectionDomain().getCodeSource()
.getLocation().getFile() + "../../target";
solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, DEFAULT_SOLR_CORE, relPath);
// create some test documents
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField("first", "bob");
doc1.addField("last", "smith");
doc1.addField("created", new Date());
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("first", "alice");
doc2.addField("last", "smith");
doc2.addField("created", new Date());
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField("first", "mike");
doc3.addField("last", "smith");
doc3.addField("created", new Date());
SolrInputDocument doc4 = new SolrInputDocument();
doc4.addField("first", "john");
doc4.addField("last", "smith");
doc4.addField("created", new Date());
SolrInputDocument doc5 = new SolrInputDocument();
doc5.addField("first", "joan");
doc5.addField("last", "smith");
doc5.addField("created", new Date());
// add the test data to the index
solrClient.add(Arrays.asList(doc1, doc2, doc3, doc4, doc5));
solrClient.commit();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
@After
public void teardown() {
File confDir = new File("conf");
assertTrue(confDir.exists());
File[] files = confDir.listFiles();
assertTrue(files.length > 0);
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
}
assertTrue(confDir.delete());
try {
solrClient.shutdown();
} catch (Exception e) {
}
}
@Test
public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = TestRunners.newTestRunner(proc);
// setup a lastEndDate file to simulate picking up from a previous end date
SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
Calendar cal = new GregorianCalendar();
cal.add(Calendar.MINUTE, -30);
final String lastEndDate = sdf.format(cal.getTime());
File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier());
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
Properties props = new Properties();
props.setProperty(GetSolr.LAST_END_DATE, lastEndDate);
props.store(fos, "GetSolr LastEndDate value");
} catch (IOException e) {
Assert.fail("Failed to setup last end date value: " + e.getMessage());
}
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3);
}
@Test
public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
}
@Test
public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends GetSolr {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrClient(ProcessContext context) {
return solrClient;
}
}
}

View File

@ -0,0 +1,416 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for PutSolr processor.
*/
public class TestPutSolrContentStream {
static final String DEFAULT_SOLR_CORE = "testCollection";
static final String CUSTOM_JSON_SINGLE_DOC_FILE = "src/test/resources/testdata/test-custom-json-single-doc.json";
static final String SOLR_JSON_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-solr-json-multiple-docs.json";
static final String CSV_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-csv-multiple-docs.csv";
static final String XML_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-xml-multiple-docs.xml";
static final SolrDocument expectedDoc1 = new SolrDocument();
static {
expectedDoc1.addField("first", "John");
expectedDoc1.addField("last", "Doe");
expectedDoc1.addField("grade", 8);
expectedDoc1.addField("subject", "Math");
expectedDoc1.addField("test", "term1");
expectedDoc1.addField("marks", 90);
}
static final SolrDocument expectedDoc2 = new SolrDocument();
static {
expectedDoc2.addField("first", "John");
expectedDoc2.addField("last", "Doe");
expectedDoc2.addField("grade", 8);
expectedDoc2.addField("subject", "Biology");
expectedDoc2.addField("test", "term1");
expectedDoc2.addField("marks", 86);
}
/**
* Creates a base TestRunner with Solr Type of standard.
*/
private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) {
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
return runner;
}
@Test
public void testUpdateWithSolrJson() throws IOException, SolrServerException {
final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
runner.setProperty("json.command", "false");
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try {
proc.getSolrClient().close();
} catch (Exception e) {
}
}
}
@Test
public void testUpdateWithCustomJson() throws IOException, SolrServerException {
final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
runner.setProperty("split", "/exams");
runner.setProperty("f.1", "first:/first");
runner.setProperty("f.2", "last:/last");
runner.setProperty("f.3", "grade:/grade");
runner.setProperty("f.4", "subject:/exams/subject");
runner.setProperty("f.5", "test:/exams/test");
runner.setProperty("f.6", "marks:/exams/marks");
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try {
proc.getSolrClient().close();
} catch (Exception e) {
}
}
}
@Test
public void testUpdateWithCsv() throws IOException, SolrServerException {
final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv");
runner.setProperty("fieldnames", "first,last,grade,subject,test,marks");
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try {
proc.getSolrClient().close();
} catch (Exception e) {
}
}
}
@Test
public void testUpdateWithXml() throws IOException, SolrServerException {
final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update");
runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml");
try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try {
proc.getSolrClient().close();
} catch (Exception e) {
}
}
}
@Test
public void testDeleteWithXml() throws IOException, SolrServerException {
final SolrClient solrClient = createEmbeddedSolrClient(DEFAULT_SOLR_CORE);
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update");
runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml");
runner.setProperty("commit", "true");
// add a document so there is something to delete
SolrInputDocument doc = new SolrInputDocument();
doc.addField("first", "bob");
doc.addField("last", "smith");
doc.addField("created", new Date());
solrClient.add(doc);
solrClient.commit();
// prove the document got added
SolrQuery query = new SolrQuery("*:*");
QueryResponse qResponse = solrClient.query(query);
Assert.assertEquals(1, qResponse.getResults().getNumFound());
// run the processor with a delete-by-query command
runner.enqueue("<delete><query>first:bob</query></delete>".getBytes("UTF-8"));
runner.run();
// prove the document got deleted
qResponse = solrClient.query(query);
Assert.assertEquals(0, qResponse.getResults().getNumFound());
}
@Test
public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrServerException("Invalid Document");
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
}
}
@Test
public void testSolrServerExceptionCausedByIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrServerException(new IOException("Error communicating with Solr"));
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1);
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
}
}
@Test
public void testSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error");
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
}
}
@Test
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new HttpSolrClient.RemoteSolrException(
"host", 401, "error", new NumberFormatException());
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
}
}
@Test
public void testIOExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
final Throwable throwable = new IOException("Error communicating with Solr");
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1);
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class), eq((String)null));
}
}
@Test
public void testSolrTypeCloudShouldRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertNotValid();
runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1");
runner.assertValid();
}
@Test
public void testSolrTypeStandardShouldNotRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
}
// Override the createSolrClient method to inject a Mock.
private class ExceptionThrowingProcessor extends PutSolrContentStream {
private SolrClient mockSolrClient;
private Throwable throwable;
public ExceptionThrowingProcessor(Throwable throwable) {
this.throwable = throwable;
}
@Override
protected SolrClient createSolrClient(ProcessContext context) {
mockSolrClient = Mockito.mock(SolrClient.class);
try {
when(mockSolrClient.request(any(SolrRequest.class),
eq((String)null))).thenThrow(throwable);
} catch (SolrServerException e) {
Assert.fail(e.getMessage());
} catch (IOException e) {
Assert.fail(e.getMessage());
}
return mockSolrClient;
}
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends PutSolrContentStream {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrClient(ProcessContext context) {
return solrClient;
}
}
// Create an EmbeddedSolrClient with the given core name.
private static SolrClient createEmbeddedSolrClient(String coreName) throws IOException {
String relPath = TestPutSolrContentStream.class.getProtectionDomain()
.getCodeSource().getLocation().getFile()
+ "../../target";
return EmbeddedSolrServerFactory.create(
EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
EmbeddedSolrServerFactory.DEFAULT_CORE_HOME,
coreName, relPath);
}
/**
* Verify that given SolrServer contains the expected SolrDocuments.
*/
private static void verifySolrDocuments(SolrClient solrServer, Collection<SolrDocument> expectedDocuments)
throws IOException, SolrServerException {
solrServer.commit();
SolrQuery query = new SolrQuery("*:*");
QueryResponse qResponse = solrServer.query(query);
Assert.assertEquals(expectedDocuments.size(), qResponse.getResults().getNumFound());
// verify documents have expected fields and values
for (SolrDocument expectedDoc : expectedDocuments) {
boolean found = false;
for (SolrDocument solrDocument : qResponse.getResults()) {
boolean foundAllFields = true;
for (String expectedField : expectedDoc.getFieldNames()) {
Object expectedVal = expectedDoc.getFirstValue(expectedField);
Object actualVal = solrDocument.getFirstValue(expectedField);
foundAllFields = expectedVal.equals(actualVal);
}
if (foundAllFields) {
found = true;
break;
}
}
Assert.assertTrue("Could not find " + expectedDoc, found);
}
}
}

View File

@ -0,0 +1,14 @@
# Logging level
solr.log=logs/
log4j.rootLogger=INFO, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.org.apache.hadoop=WARN
# set to INFO to enable infostream log messages
log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8" ?>
<solr>
<solrcloud>
<str name="host">${host:}</str>
<int name="hostPort">${jetty.port:8983}</int>
<str name="hostContext">${hostContext:solr}</str>
<int name="zkClientTimeout">${zkClientTimeout:30000}</int>
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
</solrcloud>
<shardHandlerFactory name="shardHandlerFactory"
class="HttpShardHandlerFactory">
<int name="socketTimeout">${socketTimeout:0}</int>
<int name="connTimeout">${connTimeout:0}</int>
</shardHandlerFactory>
</solr>

View File

@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# a couple of test stopwords to test that the words are really being
# configured from this file:
stopworda
stopwordb
# Standard english stop words taken from Lucene's StopAnalyzer
a
an
and
are
as
at
be
but
by
for
if
in
into
is
it
no
not
of
on
or
such
that
the
their
then
there
these
they
this
to
was
will
with

View File

@ -0,0 +1,21 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-----------------------------------------------------------------------
# Use a protected word file to protect against the stemmer reducing two
# unrelated words to the same base word.
# Some non-words that normally won't be encountered,
# just to test that they won't be stemmed.
dontstems
zwhacky

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<schema version="1.5" name="testCollection">
<fieldType name="string" class="solr.StrField"/>
<fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<field name="first" type="string" indexed="true" stored="true" />
<field name="last" type="string" indexed="true" stored="true" />
<field name="grade" type="int" indexed="true" stored="true" />
<field name="marks" type="int" indexed="true" stored="true" />
<field name="test" type="string" indexed="true" stored="true" />
<field name="subject" type="string" indexed="true" stored="true" />
<field name="created" type="date" indexed="true" stored="true" />
</schema>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" ?>
<config>
<luceneMatchVersion>5.0.0</luceneMatchVersion>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<indexConfig>
<lockType>single</lockType>
</indexConfig>
<requestDispatcher handleSelect="false">
<httpCaching never304="true" />
</requestDispatcher>
<requestHandler name="/select" class="solr.SearchHandler" />
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
</config>

View File

@ -0,0 +1,29 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-----------------------------------------------------------------------
#some test synonym mappings unlikely to appear in real input text
aaafoo => aaabar
bbbfoo => bbbfoo bbbbar
cccfoo => cccbar cccbaz
fooaaa,baraaa,bazaaa
# Some synonym groups specific to this example
GB,gib,gigabyte,gigabytes
MB,mib,megabyte,megabytes
Television, Televisions, TV, TVs
#notice we use "gib" instead of "GiB" so any WordDelimiterFilter coming
#after us won't split it into two words.
# Synonym mappings can be used for spelling correction too
pixima => pixma

View File

@ -0,0 +1,2 @@
John,Doe,8,Math,term1,90
John,Doe,8,Biology,term1,86
1 John Doe 8 Math term1 90
2 John Doe 8 Biology term1 86

View File

@ -0,0 +1,15 @@
{
"first": "John",
"last": "Doe",
"grade": 8,
"exams": [
{
"subject": "Math",
"test" : "term1",
"marks":90},
{
"subject": "Biology",
"test" : "term1",
"marks":86}
]
}

View File

@ -0,0 +1,18 @@
[
{
"first": "John",
"last": "Doe",
"grade": 8,
"subject": "Math",
"test" : "term1",
"marks": 90
},
{
"first": "John",
"last": "Doe",
"grade": 8,
"subject": "Biology",
"test" : "term1",
"marks": 86
}
]

View File

@ -0,0 +1,18 @@
<add>
<doc>
<field name="first">John</field>
<field name="last">Doe</field>
<field name="grade">8</field>
<field name="subject">Math</field>
<field name="test">term1</field>
<field name="marks">90</field>
</doc>
<doc>
<field name="first">John</field>
<field name="last">Doe</field>
<field name="grade">8</field>
<field name="subject">Biology</field>
<field name="test">term1</field>
<field name="marks">86</field>
</doc>
</add>

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-bundle</artifactId>
<packaging>pom</packaging>
<description>A bundle of processors that can store and retrieve data from Apache Solr</description>
<properties>
<solr.version>5.1.0</solr.version>
</properties>
<modules>
<module>nifi-solr-processors</module>
<module>nifi-solr-nar</module>
</modules>
</project>

View File

@ -34,6 +34,7 @@
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module>
<module>nifi-solr-bundle</module>
<module>nifi-aws-bundle</module>
<module>nifi-social-media-bundle</module>
<module>nifi-geo-bundle</module>

View File

@ -124,12 +124,12 @@
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.5.2</version>
</dependency>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
@ -179,7 +179,7 @@
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.6.0-4</version>
</dependency>
</dependency>
<dependency>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
@ -190,10 +190,10 @@
<artifactId>quartz</artifactId>
<version>2.2.1</version>
<exclusions>
<!--
<!--
| Exclude the quartz 2.2.1 bundled version of c3p0 because it is lgpl licensed
| We also don't use the JDBC related features of quartz for which the dependency would matter
-->
-->
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
@ -262,7 +262,7 @@
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies
<!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies
section -->
<exclusion>
<groupId>commons-logging</groupId>
@ -429,7 +429,7 @@
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@ -475,7 +475,7 @@
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
@ -520,7 +520,7 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>${jersey.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
@ -585,7 +585,7 @@
<artifactId>jetty-jsp-jdt</artifactId>
<version>2.3.3</version>
<scope>provided</scope>
</dependency>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -743,6 +743,12 @@
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-nar</artifactId>
@ -788,7 +794,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-logging-utils</artifactId>
@ -838,7 +844,7 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</dependencies>
<build>
<pluginManagement>
<plugins>