Adding nifi-solr-bundle with processors adding and retrieving data from Apache Solr

This commit is contained in:
bbende 2015-03-22 18:59:22 -04:00
parent 0bd27847ab
commit 71b6ffc958
28 changed files with 1913 additions and 31 deletions

View File

@ -45,7 +45,7 @@
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</executions>
</plugin>
</plugins>
</build>
@ -166,9 +166,14 @@
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>
<properties>
<!--Wrapper Properties-->
<nifi.wrapper.jvm.heap.initial.mb>256</nifi.wrapper.jvm.heap.initial.mb>
<nifi.wrapper.jvm.heap.max.mb>512</nifi.wrapper.jvm.heap.max.mb>
@ -176,7 +181,7 @@
<nifi.max.permgen.size.mb>128</nifi.max.permgen.size.mb>
<nifi.wrapper.logfile.maxsize>10m</nifi.wrapper.logfile.maxsize>
<nifi.wrapper.logfile.maxfiles>10</nifi.wrapper.logfile.maxfiles>
<!-- nifi.properties: core properties -->
<nifi.version>${project.version}</nifi.version>
<nifi.flowcontroller.autoResumeState>true</nifi.flowcontroller.autoResumeState>
@ -204,7 +209,7 @@
<nifi.swap.in.threads>1</nifi.swap.in.threads>
<nifi.swap.out.period>5 sec</nifi.swap.out.period>
<nifi.swap.out.threads>4</nifi.swap.out.threads>
<nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
<nifi.content.claim.max.appendable.size>10 MB</nifi.content.claim.max.appendable.size>
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
@ -214,21 +219,21 @@
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
<nifi.content.viewer.url />
<nifi.restore.directory />
<nifi.ui.banner.text />
<nifi.ui.autorefresh.interval>30 sec</nifi.ui.autorefresh.interval>
<nifi.nar.library.directory>./lib</nifi.nar.library.directory>
<nifi.nar.working.directory>./work/nar/</nifi.nar.working.directory>
<nifi.documentation.working.directory>./work/docs/components</nifi.documentation.working.directory>
<nifi.sensitive.props.algorithm>PBEWITHMD5AND256BITAES-CBC-OPENSSL</nifi.sensitive.props.algorithm>
<nifi.sensitive.props.provider>BC</nifi.sensitive.props.provider>
<nifi.h2.url.append>;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE</nifi.h2.url.append>
<nifi.remote.input.socket.port>9990</nifi.remote.input.socket.port>
<!-- persistent provenance repository properties -->
<nifi.provenance.repository.implementation>org.apache.nifi.provenance.PersistentProvenanceRepository</nifi.provenance.repository.implementation>
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
@ -243,15 +248,15 @@
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
<!-- volatile provenance repository properties -->
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>
<!-- Component status repository properties -->
<nifi.components.status.repository.implementation>org.apache.nifi.controller.status.history.VolatileComponentStatusRepository</nifi.components.status.repository.implementation>
<nifi.components.status.repository.buffer.size>288</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>5 mins</nifi.components.status.snapshot.frequency>
<!-- nifi.properties: web properties -->
<nifi.web.war.directory>./lib</nifi.web.war.directory>
<nifi.web.http.host />
@ -260,7 +265,7 @@
<nifi.web.https.port />
<nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>
<nifi.web.jetty.threads>200</nifi.web.jetty.threads>
<!-- nifi.properties: security properties -->
<nifi.security.keystore />
<nifi.security.keystoreType />
@ -277,12 +282,12 @@
<nifi.security.support.new.account.requests />
<nifi.security.ocsp.responder.url />
<nifi.security.ocsp.responder.certificate />
<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
<nifi.cluster.protocol.socket.timeout>30 sec</nifi.cluster.protocol.socket.timeout>
<nifi.cluster.protocol.connection.handshake.timeout>45 sec</nifi.cluster.protocol.connection.handshake.timeout>
<nifi.cluster.protocol.connection.handshake.timeout>45 sec</nifi.cluster.protocol.connection.handshake.timeout>
<nifi.cluster.protocol.use.multicast>false</nifi.cluster.protocol.use.multicast>
<nifi.cluster.protocol.multicast.address />
<nifi.cluster.protocol.multicast.port />
@ -297,7 +302,7 @@
<nifi.cluster.node.protocol.threads>2</nifi.cluster.node.protocol.threads>
<nifi.cluster.node.unicast.manager.address />
<nifi.cluster.node.unicast.manager.protocol.port />
<!-- nifi.properties: cluster manager properties (only configure for cluster manager) -->
<nifi.cluster.is.manager>false</nifi.cluster.is.manager>
<nifi.cluster.manager.address />
@ -349,7 +354,7 @@
</configuration>
</execution>
</executions>
</plugin>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>rpm-maven-plugin</artifactId>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<!-- These Lucene deps should be brought in through solr-core, but there
appears to be an issue with 5.0.0 that still references some 4.10.3 poms -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,310 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"Apache", "Solr", "Get", "Pull"})
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
public class GetSolr extends SolrProcessor {
public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
.Builder().name("Solr Query")
.description("A query to execute against Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
.Builder().name("Return Fields")
.description("Comma-separated list of fields names to return")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor
.Builder().name("Sort Clause")
.description("A Solr sort clause, ex: field1 asc, field2 desc")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
.Builder().name("Date Field")
.description("The name of a date field in Solr used to filter results")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size")
.description("Number of rows per Solr query")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("100")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The results of querying Solr")
.build();
static final String FILE_PREFIX = "conf/.getSolr-";
static final String LAST_END_DATE = "LastEndDate";
static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final String UNINITIALIZED_LAST_END_DATE_VALUE;
static {
SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L));
}
final AtomicReference<String> lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE);
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private final Lock fileLock = new ReentrantLock();
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(DEFAULT_COLLECTION);
descriptors.add(SOLR_QUERY);
descriptors.add(RETURN_FIELDS);
descriptors.add(SORT_CLAUSE);
descriptors.add(DATE_FIELD);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
}
@OnShutdown
public void onShutdown() {
writeLastEndDate();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
final FlowFile incomingFlowFile = session.get();
if (incomingFlowFile != null) {
session.transfer(incomingFlowFile, REL_SUCCESS);
logger.warn("found FlowFile {} in input queue; transferring to success",
new Object[]{incomingFlowFile});
}
readLastEndDate();
final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
final String currDate = sdf.format(new Date());
final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
final String query = context.getProperty(SOLR_QUERY).getValue();
final SolrQuery solrQuery = new SolrQuery(query);
solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
// if initialized then apply a filter to restrict results from the last end time til now
if (initialized) {
StringBuilder filterQuery = new StringBuilder();
filterQuery.append(context.getProperty(DATE_FIELD).getValue())
.append(":{").append(lastEndDatedRef.get()).append(" TO ")
.append(currDate).append("]");
solrQuery.addFilterQuery(filterQuery.toString());
logger.info("Applying filter query {}", new Object[]{filterQuery.toString()});
}
final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
if (returnFields != null && !returnFields.trim().isEmpty()) {
for (String returnField : returnFields.trim().split("[,]")) {
solrQuery.addField(returnField.trim());
}
}
final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue();
if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
for (String sortClause : fullSortClause.split("[,]")) {
String[] sortParts = sortClause.trim().split("[ ]");
solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1]));
}
}
try {
// run the initial query and send out the first page of results
final StopWatch stopWatch = new StopWatch(true);
QueryResponse response = getSolrServer().query(solrQuery);
stopWatch.stop();
long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final SolrDocumentList documentList = response.getResults();
logger.info("Retrieved {} results from Solr for {} in {} ms",
new Object[] {documentList.getNumFound(), query, duration});
if (documentList != null && documentList.getNumFound() > 0) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
session.transfer(flowFile, REL_SUCCESS);
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue());
}
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
// if initialized then page through the results and send out each page
if (initialized) {
int endRow = response.getResults().size();
long totalResults = response.getResults().getNumFound();
while (endRow < totalResults) {
solrQuery.setStart(endRow);
stopWatch.start();
response = getSolrServer().query(solrQuery);
stopWatch.stop();
duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration});
flowFile = session.create();
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
endRow += response.getResults().size();
}
}
}
lastEndDatedRef.set(currDate);
writeLastEndDate();
} catch (SolrServerException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e);
throw new ProcessException(e);
} catch (final Throwable t) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t);
throw t;
}
}
private void readLastEndDate() {
fileLock.lock();
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
try (FileInputStream fis = new FileInputStream(lastEndDateCache)) {
Properties props = new Properties();
props.load(fis);
lastEndDatedRef.set(props.getProperty(LAST_END_DATE));
} catch (IOException swallow) {
} finally {
fileLock.unlock();
}
}
private void writeLastEndDate() {
fileLock.lock();
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
Properties props = new Properties();
props.setProperty(LAST_END_DATE, lastEndDatedRef.get());
props.store(fos, "GetSolr LastEndDate value");
} catch (IOException e) {
getLogger().error("Failed to persist LastEndDate due to " + e, e);
} finally {
fileLock.unlock();
}
}
/**
* Writes each SolrDocument in XML format to the OutputStream.
*/
private class QueryResponseOutputStreamCallback implements OutputStreamCallback {
private QueryResponse response;
public QueryResponseOutputStreamCallback(QueryResponse response) {
this.response = response;
}
@Override
public void process(OutputStream out) throws IOException {
for (SolrDocument doc : response.getResults()) {
String xml = ClientUtils.toXML(ClientUtils.toSolrInputDocument(doc));
IOUtils.write(xml, out);
}
}
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Tags({"Apache", "Solr", "Put", "Send"})
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
public class PutSolrContentStream extends SolrProcessor {
public static final PropertyDescriptor CONTENT_STREAM_URL = new PropertyDescriptor
.Builder().name("Content Stream URL")
.description("The URL in Solr to post the ContentStream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("/update/json/docs")
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor
.Builder().name("Content-Type")
.description("Content-Type being sent to Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("application/json")
.build();
public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor
.Builder().name("Request Parameters")
.description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2")
.required(false)
.addValidator(RequestParamsUtil.getValidator())
.defaultValue("json.command=false&split=/&f=id:/field1")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed for any reason other than Solr being unreachable")
.build();
public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
.name("connection_failure")
.description("FlowFiles that failed because Solr is unreachable")
.build();
/**
* The name of a FlowFile attribute used for specifying a Solr collection.
*/
public static final String SOLR_COLLECTION_ATTR = "solr.collection";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private volatile MultiMapSolrParams requestParams;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(DEFAULT_COLLECTION);
descriptors.add(CONTENT_STREAM_URL);
descriptors.add(CONTENT_TYPE);
descriptors.add(REQUEST_PARAMS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
relationships.add(REL_CONNECTION_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected void additionalOnScheduled(ProcessContext context) {
final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue();
this.requestParams = RequestParamsUtil.parse(requestParamsVal);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final ObjectHolder<SolrException> error = new ObjectHolder<>(null);
final ObjectHolder<SolrServerException> connectionError = new ObjectHolder<>(null);
final ObjectHolder<String> collectionUsed = new ObjectHolder<>(null);
final String collectionAttrVal = flowFile.getAttribute(SOLR_COLLECTION_ATTR);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(
context.getProperty(CONTENT_STREAM_URL).getValue());
request.setParams(new ModifiableSolrParams());
// add the extra params, don't use 'set' in case of repeating params
Iterator<String> paramNames = requestParams.getParameterNamesIterator();
while (paramNames.hasNext()) {
String paramName = paramNames.next();
for (String paramValue : requestParams.getParams(paramName)) {
request.getParams().add(paramName, paramValue);
}
}
// send the request to the specified collection, or to the default collection
if (isSolrCloud) {
String collection = collectionAttrVal;
if (StringUtils.isBlank(collection)) {
collection = context.getProperty(DEFAULT_COLLECTION).getValue();
}
request.setParam("collection", collection);
collectionUsed.set(collection);
}
try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
// add the FlowFile's content on the UpdateRequest
request.addContentStream(new ContentStreamBase() {
@Override
public InputStream getStream() throws IOException {
return bufferedIn;
}
@Override
public String getContentType() {
return context.getProperty(CONTENT_TYPE).getValue();
}
});
UpdateResponse response = request.process(getSolrServer());
getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
} catch (SolrException e) {
error.set(e);
} catch (SolrServerException e) {
connectionError.set(e);
}
}
});
timer.stop();
if (error.get() != null) {
getLogger().error("Failed to send {} to Solr due to {} with status code {}; routing to failure",
new Object[]{flowFile, error.get(), error.get().code()});
session.transfer(flowFile, REL_FAILURE);
} else if (connectionError.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
new Object[]{flowFile, connectionError.get()});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_CONNECTION_FAILURE);
} else {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (isSolrCloud) {
transitUri.append(":").append(collectionUsed.get());
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
session.transfer(flowFile, REL_ORIGINAL);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.solr.common.params.MultiMapSolrParams;
import java.util.HashMap;
import java.util.Map;
public class RequestParamsUtil {
/**
* Parses a String of request params into a MultiMap.
*
* @param requestParams
* the value of the request params property
* @return
*/
public static MultiMapSolrParams parse(final String requestParams) {
final Map<String,String[]> paramsMap = new HashMap<>();
if (requestParams == null || requestParams.trim().isEmpty()) {
return new MultiMapSolrParams(paramsMap);
}
final String[] params = requestParams.split("[&]");
if (params == null || params.length == 0) {
throw new IllegalStateException(
"Parameters must be in form k1=v1&k2=v2, was" + requestParams);
}
for (final String param : params) {
final String[] keyVal = param.split("=");
if (keyVal.length != 2) {
throw new IllegalStateException(
"Parameter must be in form key=value, was " + param);
}
final String key = keyVal[0].trim();
final String val = keyVal[1].trim();
MultiMapSolrParams.addParam(key, val, paramsMap);
}
return new MultiMapSolrParams(paramsMap);
}
/**
* Creates a property validator for a request params string.
*
* @return valid if the input parses successfully, invalid otherwise
*/
public static Validator getValidator() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
try {
RequestParamsUtil.parse(input);
return new ValidationResult.Builder().subject(subject).input(input)
.explanation("Valid Params").valid(true).build();
} catch (final Exception e) {
return new ValidationResult.Builder().subject(subject).input(input)
.explanation("Invalid Params" + e.getMessage()).valid(false).build();
}
}
};
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A base class for processors that interact with Apache Solr.
*
*/
public abstract class SolrProcessor extends AbstractProcessor {
public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
"Cloud", "Cloud", "A SolrCloud instance.");
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor
.Builder().name("Solr Location")
.description("The Solr url for a Solr Type of Standard, " +
"or the ZooKeeper hosts for a Solr Type of Cloud.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DEFAULT_COLLECTION = new PropertyDescriptor
.Builder().name("Default Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile SolrClient solrServer;
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
this.solrServer = createSolrServer(context);
additionalOnScheduled(context);
}
/**
* Create a SolrServer based on the type of Solr specified.
*
* @param context
* The context
* @return an HttpSolrServer or CloudSolrServer
*/
protected SolrClient createSolrServer(final ProcessContext context) {
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
} else {
CloudSolrClient cloudSolrServer = new CloudSolrClient(
context.getProperty(SOLR_LOCATION).getValue());
cloudSolrServer.setDefaultCollection(
context.getProperty(DEFAULT_COLLECTION).getValue());
return cloudSolrServer;
}
}
/**
* Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
* {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} method
*
* @return
*/
protected final SolrClient getSolrServer() {
return solrServer;
}
/**
* Allows additional action to be taken during scheduling of processor.
*
* @param context
* The context
*/
protected void additionalOnScheduled(final ProcessContext context) {
}
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>();
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
final String collection = context.getProperty(DEFAULT_COLLECTION).getValue();
if (collection == null || collection.trim().isEmpty()) {
problems.add(new ValidationResult.Builder()
.subject(DEFAULT_COLLECTION.getName())
.input(collection).valid(false)
.explanation("A collection must specified for Solr Type of Cloud")
.build());
}
}
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
return problems;
}
/**
* Allows additional custom validation to be done. This will be called from
* the parent's customValidation method.
*
* @param context
* The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
return new ArrayList<>();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.solr.PutSolrContentStream
org.apache.nifi.processors.solr.GetSolr

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
/**
* Helper to create EmbeddedSolrServer instances for testing.
*/
public class EmbeddedSolrServerFactory {
public static final String DEFAULT_SOLR_HOME = "src/test/resources/solr";
public static final String DEFAULT_CORE_HOME = "src/test/resources/";
public static final String DEFAULT_DATA_DIR = "target";
/**
* Use the defaults to create the core.
*
* @param coreName
* @return
*/
public static SolrClient create(String coreName) throws IOException {
return create(DEFAULT_SOLR_HOME, DEFAULT_CORE_HOME,
coreName, DEFAULT_DATA_DIR);
}
/**
*
* @param solrHome
* path to directory where solr.xml lives
* @param coreName
* the name of the core to load
* @param dataDir
* the data dir for the core
*
* @return an EmbeddedSolrServer for the given core
*/
public static SolrClient create(String solrHome, String coreHome, String coreName, String dataDir)
throws IOException {
Properties props = new Properties();
if (dataDir != null) {
File coreDataDir = new File(dataDir + "/" + coreName);
if (coreDataDir.exists()) {
FileUtils.deleteDirectory(coreDataDir);
}
props.setProperty("dataDir", dataDir + "/" + coreName);
}
CoreContainer coreContainer = new CoreContainer(solrHome);
coreContainer.load();
CoreDescriptor descriptor = new CoreDescriptor(coreContainer, coreName,
new File(coreHome, coreName).getAbsolutePath(), props);
coreContainer.create(descriptor);
return new EmbeddedSolrServer(coreContainer, coreName);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.junit.Assert;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.junit.Test;
public class RequestParamsUtilTest {
@Test
public void testSimpleParse() {
MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3");
Assert.assertEquals("1", map.get("a"));
Assert.assertEquals("2", map.get("b"));
Assert.assertEquals("3", map.get("c"));
}
@Test
public void testParseWithSpaces() {
MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 ");
Assert.assertEquals("1", map.get("a"));
Assert.assertEquals("2", map.get("b"));
Assert.assertEquals("3", map.get("c"));
}
@Test(expected = IllegalStateException.class)
public void testMalformedParamsParse() {
RequestParamsUtil.parse("a=1&b&c=3");
}
}

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import static org.junit.Assert.assertTrue;
public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection";
private SolrClient solrClient;
@Before
public void setup() {
// create the conf dir if it doesn't exist
File confDir = new File("conf");
if (!confDir.exists()) {
confDir.mkdir();
}
try {
// create an EmbeddedSolrServer for the processor to use
String relPath = getClass().getProtectionDomain().getCodeSource()
.getLocation().getFile() + "../../target";
solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, DEFAULT_SOLR_CORE, relPath);
// create some test documents
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField("first", "bob");
doc1.addField("last", "smith");
doc1.addField("created", new Date());
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("first", "alice");
doc2.addField("last", "smith");
doc2.addField("created", new Date());
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField("first", "mike");
doc3.addField("last", "smith");
doc3.addField("created", new Date());
SolrInputDocument doc4 = new SolrInputDocument();
doc4.addField("first", "john");
doc4.addField("last", "smith");
doc4.addField("created", new Date());
SolrInputDocument doc5 = new SolrInputDocument();
doc5.addField("first", "joan");
doc5.addField("last", "smith");
doc5.addField("created", new Date());
// add the test data to the index
solrClient.add(doc1);
solrClient.add(doc2);
solrClient.add(doc3);
solrClient.add(doc4);
solrClient.add(doc5);
solrClient.commit();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
@After
public void teardown() {
File confDir = new File("conf");
assertTrue(confDir.exists());
File[] files = confDir.listFiles();
assertTrue(files.length > 0);
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
}
assertTrue(confDir.delete());
try {
solrClient.shutdown();
} catch (Exception e) {
}
}
@Test
public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = TestRunners.newTestRunner(proc);
// setup a lastEndDate file to simulate picking up from a previous end date
SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
Calendar cal = new GregorianCalendar();
cal.add(Calendar.MINUTE, -30);
final String lastEndDate = sdf.format(cal.getTime());
File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier());
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
Properties props = new Properties();
props.setProperty(GetSolr.LAST_END_DATE, lastEndDate);
props.store(fos, "GetSolr LastEndDate value");
} catch (IOException e) {
Assert.fail("Failed to setup last end date value: " + e.getMessage());
}
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3);
}
@Test
public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
}
@Test
public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
}
// Override createSolrServer and return the passed in SolrClient
private class TestableProcessor extends GetSolr {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
return solrClient;
}
}
}

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.solr;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.*;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import static org.mockito.Mockito.*;
/**
* Test for PutSolr processor.
*/
public class TestPutSolrContentStream {
static final String DEFAULT_SOLR_CORE = "testCollection";
static final String CUSTOM_JSON_SINGLE_DOC_FILE = "src/test/resources/testdata/test-custom-json-single-doc.json";
static final String SOLR_JSON_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-solr-json-multiple-docs.json";
static final String CSV_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-csv-multiple-docs.csv";
static final String XML_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-xml-multiple-docs.xml";
static final SolrDocument expectedDoc1 = new SolrDocument();
static {
expectedDoc1.addField("first", "John");
expectedDoc1.addField("last", "Doe");
expectedDoc1.addField("grade", 8);
expectedDoc1.addField("subject", "Math");
expectedDoc1.addField("test", "term1");
expectedDoc1.addField("marks", 90);
}
static final SolrDocument expectedDoc2 = new SolrDocument();
static {
expectedDoc2.addField("first", "John");
expectedDoc2.addField("last", "Doe");
expectedDoc2.addField("grade", 8);
expectedDoc2.addField("subject", "Biology");
expectedDoc2.addField("test", "term1");
expectedDoc2.addField("marks", 86);
}
/**
* Creates a base TestRunner with Solr Type of standard and json update path.
*/
private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) {
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
return runner;
}
@Test
public void testUpdateWithSolrJson() throws IOException, SolrServerException {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
}
}
@Test
public void testUpdateWithCustomJson() throws IOException, SolrServerException {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
"split=/exams" +
"&f=first:/first" +
"&f=last:/last" +
"&f=grade:/grade" +
"&f=subject:/exams/subject" +
"&f=test:/exams/test" +
"&f=marks:/exams/marks");
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
}
}
@Test
public void testUpdateWithCsv() throws IOException, SolrServerException {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/csv");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
"fieldnames=first,last,grade,subject,test,marks");
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
}
}
@Test
public void testUpdateWithXml() throws IOException, SolrServerException {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update");
runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml");
try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
}
}
@Test
public void testSolrServerExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrServerException("Error communicating with Solr");
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
}
}
@Test
public void testSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error");
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
}
}
@Test
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new HttpSolrClient.RemoteSolrException(
"host", 401, "error", new NumberFormatException());
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
final TestRunner runner = createDefaultTestRunner(proc);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
}
}
@Test
public void testSolrTypeCloudShouldRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertNotValid();
runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, "someCollection1");
runner.assertValid();
}
@Test
public void testSolrTypeStandardShouldNotRequireCollection() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertValid();
}
@Test
public void testRequestParamsShouldBeInvalid() {
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b");
runner.assertNotValid();
}
/**
* Override the creatrSolrServer method to inject a Mock.
*/
private class ExceptionThrowingProcessor extends PutSolrContentStream {
private SolrClient mockSolrServer;
private Throwable throwable;
public ExceptionThrowingProcessor(Throwable throwable) {
this.throwable = throwable;
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
mockSolrServer = Mockito.mock(SolrClient.class);
try {
when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable);
} catch (SolrServerException e) {
Assert.fail(e.getMessage());
} catch (IOException e) {
Assert.fail(e.getMessage());
}
return mockSolrServer;
}
}
/**
* Override the createSolrServer method and create and EmbeddedSolrServer.
*/
private class EmbeddedSolrServerProcessor extends PutSolrContentStream {
private String coreName;
private SolrClient embeddedSolrServer;
public EmbeddedSolrServerProcessor(String coreName) {
this.coreName = coreName;
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
try {
String relPath = getClass().getProtectionDomain()
.getCodeSource().getLocation().getFile()
+ "../../target";
embeddedSolrServer = EmbeddedSolrServerFactory.create(
EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
EmbeddedSolrServerFactory.DEFAULT_CORE_HOME,
coreName, relPath);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
return embeddedSolrServer;
}
}
/**
* Verify that given SolrServer contains the expected SolrDocuments.
*/
private static void verifySolrDocuments(SolrClient solrServer, Collection<SolrDocument> expectedDocuments)
throws IOException, SolrServerException {
solrServer.commit();
SolrQuery query = new SolrQuery("*:*");
QueryResponse qResponse = solrServer.query(query);
Assert.assertEquals(expectedDocuments.size(), qResponse.getResults().getNumFound());
// verify documents have expected fields and values
for (SolrDocument expectedDoc : expectedDocuments) {
boolean found = false;
for (SolrDocument solrDocument : qResponse.getResults()) {
boolean foundAllFields = true;
for (String expectedField : expectedDoc.getFieldNames()) {
Object expectedVal = expectedDoc.getFirstValue(expectedField);
Object actualVal = solrDocument.getFirstValue(expectedField);
foundAllFields = expectedVal.equals(actualVal);
}
if (foundAllFields) {
found = true;
break;
}
}
Assert.assertTrue("Could not find " + expectedDoc, found);
}
}
}

View File

@ -0,0 +1,14 @@
# Logging level
solr.log=logs/
log4j.rootLogger=INFO, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.org.apache.hadoop=WARN
# set to INFO to enable infostream log messages
log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8" ?>
<solr>
<solrcloud>
<str name="host">${host:}</str>
<int name="hostPort">${jetty.port:8983}</int>
<str name="hostContext">${hostContext:solr}</str>
<int name="zkClientTimeout">${zkClientTimeout:30000}</int>
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
</solrcloud>
<shardHandlerFactory name="shardHandlerFactory"
class="HttpShardHandlerFactory">
<int name="socketTimeout">${socketTimeout:0}</int>
<int name="connTimeout">${connTimeout:0}</int>
</shardHandlerFactory>
</solr>

View File

@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# a couple of test stopwords to test that the words are really being
# configured from this file:
stopworda
stopwordb
# Standard english stop words taken from Lucene's StopAnalyzer
a
an
and
are
as
at
be
but
by
for
if
in
into
is
it
no
not
of
on
or
such
that
the
their
then
there
these
they
this
to
was
will
with

View File

@ -0,0 +1,21 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-----------------------------------------------------------------------
# Use a protected word file to protect against the stemmer reducing two
# unrelated words to the same base word.
# Some non-words that normally won't be encountered,
# just to test that they won't be stemmed.
dontstems
zwhacky

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<schema version="1.5" name="testCollection">
<fieldType name="string" class="solr.StrField"/>
<fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
<field name="_version_" type="long" indexed="true" stored="true"/>
<field name="first" type="string" indexed="true" stored="true" />
<field name="last" type="string" indexed="true" stored="true" />
<field name="grade" type="int" indexed="true" stored="true" />
<field name="marks" type="int" indexed="true" stored="true" />
<field name="test" type="string" indexed="true" stored="true" />
<field name="subject" type="string" indexed="true" stored="true" />
<field name="created" type="date" indexed="true" stored="true" />
</schema>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" ?>
<config>
<luceneMatchVersion>5.0.0</luceneMatchVersion>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<indexConfig>
<lockType>single</lockType>
</indexConfig>
<requestDispatcher handleSelect="false">
<httpCaching never304="true" />
</requestDispatcher>
<requestHandler name="/select" class="solr.SearchHandler" />
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
</config>

View File

@ -0,0 +1,29 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#-----------------------------------------------------------------------
#some test synonym mappings unlikely to appear in real input text
aaafoo => aaabar
bbbfoo => bbbfoo bbbbar
cccfoo => cccbar cccbaz
fooaaa,baraaa,bazaaa
# Some synonym groups specific to this example
GB,gib,gigabyte,gigabytes
MB,mib,megabyte,megabytes
Television, Televisions, TV, TVs
#notice we use "gib" instead of "GiB" so any WordDelimiterFilter coming
#after us won't split it into two words.
# Synonym mappings can be used for spelling correction too
pixima => pixma

View File

@ -0,0 +1,2 @@
John,Doe,8,Math,term1,90
John,Doe,8,Biology,term1,86
1 John Doe 8 Math term1 90
2 John Doe 8 Biology term1 86

View File

@ -0,0 +1,15 @@
{
"first": "John",
"last": "Doe",
"grade": 8,
"exams": [
{
"subject": "Math",
"test" : "term1",
"marks":90},
{
"subject": "Biology",
"test" : "term1",
"marks":86}
]
}

View File

@ -0,0 +1,18 @@
[
{
"first": "John",
"last": "Doe",
"grade": 8,
"subject": "Math",
"test" : "term1",
"marks": 90
},
{
"first": "John",
"last": "Doe",
"grade": 8,
"subject": "Biology",
"test" : "term1",
"marks": 86
}
]

View File

@ -0,0 +1,18 @@
<add>
<doc>
<field name="first">John</field>
<field name="last">Doe</field>
<field name="grade">8</field>
<field name="subject">Math</field>
<field name="test">term1</field>
<field name="marks">90</field>
</doc>
<doc>
<field name="first">John</field>
<field name="last">Doe</field>
<field name="grade">8</field>
<field name="subject">Biology</field>
<field name="test">term1</field>
<field name="marks">86</field>
</doc>
</add>

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-bundle</artifactId>
<packaging>pom</packaging>
<description>A bundle of processors that can store and retrieve data from Apache Solr</description>
<properties>
<solr.version>5.0.0</solr.version>
</properties>
<modules>
<module>nifi-solr-processors</module>
<module>nifi-solr-nar</module>
</modules>
</project>

View File

@ -35,6 +35,7 @@
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module>
<module>nifi-solr-bundle</module>
</modules>
<dependencyManagement>
<dependencies>

View File

@ -170,12 +170,12 @@
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.5.2</version>
</dependency>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
@ -220,7 +220,7 @@
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.6.0-4</version>
</dependency>
</dependency>
<dependency>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
@ -231,10 +231,10 @@
<artifactId>quartz</artifactId>
<version>2.2.1</version>
<exclusions>
<!--
<!--
| Exclude the quartz 2.2.1 bundled version of c3p0 because it is lgpl licensed
| We also don't use the JDBC related features of quartz for which the dependency would matter
-->
-->
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
@ -303,7 +303,7 @@
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies
<!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies
section -->
<exclusion>
<groupId>commons-logging</groupId>
@ -470,7 +470,7 @@
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@ -516,7 +516,7 @@
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
@ -542,7 +542,7 @@
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
@ -562,7 +562,7 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>${jersey.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
@ -627,7 +627,7 @@
<artifactId>jetty-jsp-jdt</artifactId>
<version>2.3.3</version>
<scope>provided</scope>
</dependency>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -781,6 +781,12 @@
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-nar</artifactId>
@ -802,7 +808,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-logging-utils</artifactId>
@ -852,7 +858,7 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</dependencies>
<build>
<pluginManagement>
<plugins>
@ -999,7 +1005,7 @@
<requireMavenVersion>
<version>${maven.min-version}</version>
</requireMavenVersion>
</rules>
</rules>
</configuration>
</execution>
</executions>