NIFI-3248: Improvement of GetSolr Processor

This closes #2199.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
JohannesDaniel 2017-10-05 22:57:53 +02:00 committed by Koji Kawamura
parent c894246e03
commit c06dee2321
12 changed files with 641 additions and 270 deletions

View File

@ -26,6 +26,18 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.solr</groupId> <groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId> <artifactId>solr-solrj</artifactId>
@ -61,6 +73,12 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<!-- test dependencies --> <!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.5.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
@ -136,6 +154,7 @@
<exclude>src/test/resources/testdata/test-xml-multiple-docs.xml</exclude> <exclude>src/test/resources/testdata/test-xml-multiple-docs.xml</exclude>
<exclude>src/test/resources/log4j.properties</exclude> <exclude>src/test/resources/log4j.properties</exclude>
<exclude>src/test/resources/jaas-client.conf</exclude> <exclude>src/test/resources/jaas-client.conf</exclude>
<exclude>src/test/resources/test-schema.avsc</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -18,35 +18,37 @@
*/ */
package org.apache.nifi.processors.solr; package org.apache.nifi.processors.solr;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Properties; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
@ -57,7 +59,18 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StringUtils;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
@ -66,42 +79,72 @@ import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CursorMarkParams;
@Tags({"Apache", "Solr", "Get", "Pull"}) @Tags({"Apache", "Solr", "Get", "Pull", "Records"})
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") @CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.")
public class GetSolr extends SolrProcessor { public class GetSolr extends SolrProcessor {
public static final String STATE_MANAGER_FILTER = "stateManager_filter";
public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark";
public static final AllowableValue MODE_XML = new AllowableValue("XML");
public static final AllowableValue MODE_REC = new AllowableValue("Records");
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
.Builder().name("Return Type")
.displayName("Return Type")
.description("Write Solr documents to FlowFiles as XML or using a Record Writer")
.required(true)
.allowableValues(MODE_XML, MODE_REC)
.defaultValue(MODE_XML.getValue())
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor
.Builder().name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(false)
.required(false)
.build();
public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
.Builder().name("Solr Query") .Builder().name("Solr Query")
.displayName("Solr Query")
.description("A query to execute against Solr") .description("A query to execute against Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
.Builder().name("Return Fields")
.description("Comma-separated list of fields names to return")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor
.Builder().name("Sort Clause")
.description("A Solr sort clause, ex: field1 asc, field2 desc")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
.Builder().name("Date Field") .Builder().name("Date Field")
.displayName("Date Field")
.description("The name of a date field in Solr used to filter results") .description("The name of a date field in Solr used to filter results")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor
.Builder().name("Initial Date Filter")
.displayName("Initial Date Filter")
.description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr 'YYYY-MM-DDThh:mm:ssZ'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
.Builder().name("Return Fields")
.displayName("Return Fields")
.description("Comma-separated list of field names to return")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size") .Builder().name("Batch Size")
.displayName("Batch Size")
.description("Number of rows per Solr query") .description("Number of rows per Solr query")
.required(true) .required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR) .addValidator(StandardValidators.INTEGER_VALIDATOR)
@ -113,22 +156,17 @@ public class GetSolr extends SolrProcessor {
.description("The results of querying Solr") .description("The results of querying Solr")
.build(); .build();
static final String FILE_PREFIX = "conf/.getSolr-"; private final AtomicBoolean clearState = new AtomicBoolean(false);
static final String LAST_END_DATE = "LastEndDate"; private final AtomicBoolean dateFieldNotInSpecifiedFieldsList = new AtomicBoolean(false);
static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; private volatile String id_field = null;
static final String UNINITIALIZED_LAST_END_DATE_VALUE;
private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
static { static {
SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); df.setTimeZone(TimeZone.getTimeZone("GMT"));
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L));
} }
final AtomicReference<String> lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE);
private Set<Relationship> relationships; private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors; private List<PropertyDescriptor> descriptors;
private final Lock fileLock = new ReentrantLock();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -138,10 +176,12 @@ public class GetSolr extends SolrProcessor {
descriptors.add(SOLR_TYPE); descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION); descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION); descriptors.add(COLLECTION);
descriptors.add(RETURN_TYPE);
descriptors.add(RECORD_WRITER);
descriptors.add(SOLR_QUERY); descriptors.add(SOLR_QUERY);
descriptors.add(RETURN_FIELDS);
descriptors.add(SORT_CLAUSE);
descriptors.add(DATE_FIELD); descriptors.add(DATE_FIELD);
descriptors.add(DATE_FILTER);
descriptors.add(RETURN_FIELDS);
descriptors.add(BATCH_SIZE); descriptors.add(BATCH_SIZE);
descriptors.add(JAAS_CLIENT_APP_NAME); descriptors.add(JAAS_CLIENT_APP_NAME);
descriptors.add(BASIC_USERNAME); descriptors.add(BASIC_USERNAME);
@ -170,160 +210,226 @@ public class GetSolr extends SolrProcessor {
return this.descriptors; return this.descriptors;
} }
final static Set<String> propertyNamesForActivatingClearState = new HashSet<String>();
static {
propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
propertyNamesForActivatingClearState.add(COLLECTION.getName());
propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
propertyNamesForActivatingClearState.add(DATE_FILTER.getName());
}
@Override @Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); if (propertyNamesForActivatingClearState.contains(descriptor.getName()))
clearState.set(true);
} }
@OnStopped @OnScheduled
public void onStopped() { public void clearState(final ProcessContext context) throws IOException {
writeLastEndDate(); if (clearState.getAndSet(false))
context.getStateManager().clear(Scope.CLUSTER);
final Map<String,String> stateMap = new HashMap<String,String>();
stateMap.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap());
final AtomicBoolean stateMapHasChanged = new AtomicBoolean(false);
if (stateMap.get(STATE_MANAGER_CURSOR_MARK) == null) {
stateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
stateMapHasChanged.set(true);
} }
@OnRemoved if (stateMap.get(STATE_MANAGER_FILTER) == null) {
public void onRemoved() { final String initialDate = context.getProperty(DATE_FILTER).getValue();
final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); if (StringUtils.isBlank(initialDate))
if (lastEndDateCache.exists()) { stateMap.put(STATE_MANAGER_FILTER, "*");
lastEndDateCache.delete(); else
stateMap.put(STATE_MANAGER_FILTER, initialDate);
stateMapHasChanged.set(true);
} }
if (stateMapHasChanged.get())
context.getStateManager().setState(stateMap, Scope.CLUSTER);
id_field = null;
} }
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
final ComponentLog logger = getLogger(); final Collection<ValidationResult> problems = new ArrayList<>();
readLastEndDate();
final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
sdf.setTimeZone(TimeZone.getTimeZone("GMT")); && !context.getProperty(RECORD_WRITER).isSet()) {
final String currDate = sdf.format(new Date()); problems.add(new ValidationResult.Builder()
.explanation("for writing records a record writer has to be configured")
final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); .valid(false)
.subject("Record writer check")
final String query = context.getProperty(SOLR_QUERY).getValue(); .build());
final SolrQuery solrQuery = new SolrQuery(query); }
solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); return problems;
// if initialized then apply a filter to restrict results from the last end time til now
if (initialized) {
StringBuilder filterQuery = new StringBuilder();
filterQuery.append(context.getProperty(DATE_FIELD).getValue())
.append(":{").append(lastEndDatedRef.get()).append(" TO ")
.append(currDate).append("]");
solrQuery.addFilterQuery(filterQuery.toString());
logger.info("Applying filter query {}", new Object[]{filterQuery.toString()});
}
final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
if (returnFields != null && !returnFields.trim().isEmpty()) {
for (String returnField : returnFields.trim().split("[,]")) {
solrQuery.addField(returnField.trim());
}
}
final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue();
if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
for (String sortClause : fullSortClause.split("[,]")) {
String[] sortParts = sortClause.trim().split("[ ]");
solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1]));
}
} }
private String getFieldNameOfUniqueKey() {
final SolrQuery solrQuery = new SolrQuery();
try { try {
solrQuery.setRequestHandler("/schema/uniquekey");
final QueryRequest req = new QueryRequest(solrQuery); final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) { if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword()); req.setBasicAuthCredentials(getUsername(), getPassword());
} }
// run the initial query and send out the first page of results return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
final StopWatch stopWatch = new StopWatch(true); } catch (SolrServerException | IOException e) {
QueryResponse response = req.process(getSolrClient()); getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", new Object[]{solrQuery.toString(), e}, e);
stopWatch.stop(); throw new ProcessException(e);
}
}
long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final AtomicBoolean continuePaging = new AtomicBoolean(true);
final SolrQuery solrQuery = new SolrQuery();
try {
if (id_field == null) {
id_field = getFieldNameOfUniqueKey();
}
final String dateField = context.getProperty(DATE_FIELD).getValue();
final Map<String,String> stateMap = new HashMap<String,String>();
stateMap.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap());
solrQuery.setQuery("*:*");
final String query = context.getProperty(SOLR_QUERY).getValue();
if (!StringUtils.isBlank(query) && !query.equals("*:*")) {
solrQuery.addFilterQuery(query);
}
final StringBuilder automatedFilterQuery = (new StringBuilder())
.append(dateField)
.append(":[")
.append(stateMap.get(STATE_MANAGER_FILTER))
.append(" TO *]");
solrQuery.addFilterQuery(automatedFilterQuery.toString());
final List<String> fieldList = new ArrayList<String>();
final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
if (!StringUtils.isBlank(returnFields)) {
fieldList.addAll(Arrays.asList(returnFields.trim().split("[,]")));
if (!fieldList.contains(dateField)) {
fieldList.add(dateField);
dateFieldNotInSpecifiedFieldsList.set(true);
}
for (String returnField : fieldList) {
solrQuery.addField(returnField.trim());
}
}
solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, stateMap.get(STATE_MANAGER_CURSOR_MARK));
solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
final StringBuilder sortClause = (new StringBuilder())
.append(dateField)
.append(" asc,")
.append(id_field)
.append(" asc");
solrQuery.setParam("sort", sortClause.toString());
while (continuePaging.get()) {
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
logger.debug(solrQuery.toQueryString());
final QueryResponse response = req.process(getSolrClient());
final SolrDocumentList documentList = response.getResults(); final SolrDocumentList documentList = response.getResults();
logger.info("Retrieved {} results from Solr for {} in {} ms",
new Object[] {documentList.getNumFound(), query, duration});
if (documentList != null && documentList.getNumFound() > 0) { if (response.getResults().size() > 0) {
final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1);
final String latestDateValue = df.format(lastSolrDocument.get(dateField));
final String newCursorMark = response.getNextCursorMark();
solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, newCursorMark);
stateMap.put(STATE_MANAGER_CURSOR_MARK, newCursorMark);
stateMap.put(STATE_MANAGER_FILTER, latestDateValue);
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString());
if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
if (dateFieldNotInSpecifiedFieldsList.get()) {
for (SolrDocument doc : response.getResults()) {
doc.removeFields(dateField);
}
}
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(null, null);
final RecordSet recordSet = solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());
} catch (SchemaNotFoundException e) {
throw new ProcessException("Could not parse Solr response", e);
}
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
}
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
} }
continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
// if initialized then page through the results and send out each page
if (initialized) {
int endRow = response.getResults().size();
long totalResults = response.getResults().getNumFound();
while (endRow < totalResults) {
solrQuery.setStart(endRow);
stopWatch.start();
response = getSolrClient().query(solrQuery);
stopWatch.stop();
duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration});
flowFile = session.create();
flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
endRow += response.getResults().size();
} }
} context.getStateManager().setState(stateMap, Scope.CLUSTER);
} } catch(SolrServerException | SchemaNotFoundException | IOException e){
lastEndDatedRef.set(currDate);
writeLastEndDate();
} catch (SolrServerException | IOException e) {
context.yield(); context.yield();
session.rollback(); session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e); logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), e}, e);
throw new ProcessException(e); throw new ProcessException(e);
} catch( final Throwable t){ } catch( final Throwable t){
context.yield(); context.yield();
session.rollback(); session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t); logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), t}, t);
throw t; throw t;
} }
} }
private void readLastEndDate() { /**
fileLock.lock(); * Writes each SolrDocument to a record.
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); */
try (FileInputStream fis = new FileInputStream(lastEndDateCache)) { private RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
Properties props = new Properties(); final List<Record> lr = new ArrayList<Record>();
props.load(fis);
lastEndDatedRef.set(props.getProperty(LAST_END_DATE));
} catch (IOException swallow) {
} finally {
fileLock.unlock();
}
}
private void writeLastEndDate() { for (SolrDocument doc : docs) {
fileLock.lock(); final Map<String, Object> recordValues = new LinkedHashMap<>();
File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); for (RecordField field : schema.getFields()){
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { final Object fieldValue = doc.getFieldValue(field.getFieldName());
Properties props = new Properties(); if (fieldValue != null) {
props.setProperty(LAST_END_DATE, lastEndDatedRef.get()); if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
props.store(fos, "GetSolr LastEndDate value"); recordValues.put(field.getFieldName(), ((List<Object>) fieldValue).toArray());
} catch (IOException e) { } else {
getLogger().error("Failed to persist LastEndDate due to " + e, e); recordValues.put(field.getFieldName(), fieldValue);
} finally {
fileLock.unlock();
} }
} }
}
lr.add(new MapRecord(schema, recordValues));
}
return new ListRecordSet(schema, lr);
}
/** /**
* Writes each SolrDocument in XML format to the OutputStream. * Writes each SolrDocument in XML format to the OutputStream.
@ -337,14 +443,16 @@ public class GetSolr extends SolrProcessor {
@Override @Override
public void process(OutputStream out) throws IOException { public void process(OutputStream out) throws IOException {
IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
for (SolrDocument doc : response.getResults()) { for (SolrDocument doc : response.getResults()) {
String xml = ClientUtils.toXML(toSolrInputDocument(doc)); final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
IOUtils.write(xml, out, StandardCharsets.UTF_8); IOUtils.write(xml, out, StandardCharsets.UTF_8);
} }
IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
} }
public SolrInputDocument toSolrInputDocument(SolrDocument d) { public SolrInputDocument toSolrInputDocument(SolrDocument d) {
SolrInputDocument doc = new SolrInputDocument(); final SolrInputDocument doc = new SolrInputDocument();
for (String name : d.getFieldNames()) { for (String name : d.getFieldNames()) {
doc.addField(name, d.getFieldValue(name)); doc.addField(name, d.getFieldValue(name));

View File

@ -275,7 +275,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
} }
@Override @Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) { final protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(); final List<ValidationResult> problems = new ArrayList<>();
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {

View File

@ -0,0 +1,56 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GetSolr</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor pulls data from Solr collections. For its usage, Solr collections
have to fulfil two requirements:
</p>
<ul>
<li>The documents must include a date field containing the time when they were
indexed. Such kind of field can be easily added to documents at indexing time
e. g. using Solrs' UpdateRequestProcessor created by
'TimestampUpdateProcessorFactory'.</li>
<li>The configuration of the Solr index (e. g. schema.xml or managed-schema) must
define a uniqueKey field.</li>
</ul>
<p>
Backwards compatibility to configurations of the GetSolr processor used within releases
of NiFi prior to 1.5 can be realized as follows:
</p>
<ul>
<li>Find the file conf/.getSolr* within the prior NiFi installation.</li>
<li>Open the file and copy the timestamp defined for 'LastEndDate'.</li>
<li>Insert the timestamp into the field 'Initial Date Filter'.</li>
</ul>
<p>
Annotation: The value of property 'Solr Query' actually is not added to parameter 'q'
but to parameter 'fq' for two reasons:
</p>
<ul>
<li>Improving performance by leveraging Solrs' filter cache.</li>
<li>Scoring is not required for the purpose of this processor as the sorting
is fixed to 'DateField asc, IdField asc'</li>
</ul>
</body>
</html>

View File

@ -18,9 +18,15 @@
*/ */
package org.apache.nifi.processors.solr; package org.apache.nifi.processors.solr;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -29,20 +35,14 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale; import java.util.Locale;
import java.util.Properties;
import java.util.TimeZone; import java.util.TimeZone;
import static org.junit.Assert.assertTrue;
public class TestGetSolr { public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection"; static final String DEFAULT_SOLR_CORE = "testCollection";
@ -51,13 +51,9 @@ public class TestGetSolr {
@Before @Before
public void setup() { public void setup() {
// create the conf dir if it doesn't exist
File confDir = new File("conf");
if (!confDir.exists()) {
confDir.mkdir();
}
try { try {
// create an EmbeddedSolrServer for the processor to use // create an EmbeddedSolrServer for the processor to use
String relPath = getClass().getProtectionDomain().getCodeSource() String relPath = getClass().getProtectionDomain().getCodeSource()
.getLocation().getFile() + "../../target"; .getLocation().getFile() + "../../target";
@ -65,104 +61,48 @@ public class TestGetSolr {
solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
DEFAULT_SOLR_CORE, relPath); DEFAULT_SOLR_CORE, relPath);
// create some test documents for (int i = 0; i < 10; i++) {
SolrInputDocument doc1 = new SolrInputDocument(); SolrInputDocument doc = new SolrInputDocument();
doc1.addField("first", "bob"); doc.addField("id", "doc" + i);
doc1.addField("last", "smith"); doc.addField("created", new Date());
doc1.addField("created", new Date()); doc.addField("string_single", "single" + i + ".1");
doc.addField("string_multi", "multi" + i + ".1");
doc.addField("string_multi", "multi" + i + ".2");
doc.addField("integer_single", i);
doc.addField("integer_multi", 1);
doc.addField("integer_multi", 2);
doc.addField("integer_multi", 3);
doc.addField("double_single", 0.5 + i);
solrClient.add(doc);
SolrInputDocument doc2 = new SolrInputDocument(); }
doc2.addField("first", "alice");
doc2.addField("last", "smith");
doc2.addField("created", new Date());
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField("first", "mike");
doc3.addField("last", "smith");
doc3.addField("created", new Date());
SolrInputDocument doc4 = new SolrInputDocument();
doc4.addField("first", "john");
doc4.addField("last", "smith");
doc4.addField("created", new Date());
SolrInputDocument doc5 = new SolrInputDocument();
doc5.addField("first", "joan");
doc5.addField("last", "smith");
doc5.addField("created", new Date());
// add the test data to the index
solrClient.add(Arrays.asList(doc1, doc2, doc3, doc4, doc5));
solrClient.commit(); solrClient.commit();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
} }
@After @After
public void teardown() { public void teardown() {
File confDir = new File("conf");
assertTrue(confDir.exists());
File[] files = confDir.listFiles();
if (files.length > 0) {
for (File file : files) {
assertTrue("Failed to delete " + file.getName(), file.delete());
}
}
assertTrue(confDir.delete());
try { try {
solrClient.close(); solrClient.close();
} catch (Exception e) { } catch (Exception e) {
} }
} }
@Test
public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
final TestRunner runner = TestRunners.newTestRunner(proc);
// setup a lastEndDate file to simulate picking up from a previous end date
SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
Calendar cal = new GregorianCalendar();
cal.add(Calendar.MINUTE, -30);
final String lastEndDate = sdf.format(cal.getTime());
File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier());
try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) {
Properties props = new Properties();
props.setProperty(GetSolr.LAST_END_DATE, lastEndDate);
props.store(fos, "GetSolr LastEndDate value");
} catch (IOException e) {
Assert.fail("Failed to setup last end date value: " + e.getMessage());
}
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3);
}
@Test @Test
public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created"); runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10"); runner.setProperty(GetSolr.BATCH_SIZE, "20");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1);
@ -170,40 +110,243 @@ public class TestGetSolr {
@Test @Test
public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz"); runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created"); runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10"); runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
} }
@Test @Test
public void testOnRemovedRemovesState() throws IOException, SolrServerException { public void testSolrModes() throws IOException, SolrServerException {
final TestableProcessor proc = new TestableProcessor(solrClient);
}
@Test(expected = java.lang.AssertionError.class)
public void testValidation() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.SOLR_QUERY, "last:smith");
runner.setProperty(GetSolr.RETURN_FIELDS, "created");
runner.setProperty(GetSolr.SORT_CLAUSE, "created desc");
runner.setProperty(GetSolr.DATE_FIELD, "created"); runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "10"); runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue());
runner.run(); runner.run(1);
}
File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier()); @Test
Assert.assertTrue("State file should exist, but doesn't", lastEndDateCache.exists()); public void testCompletenessDespiteUpdates() throws IOException, SolrServerException {
proc.onRemoved(); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
Assert.assertFalse("State file should have been removed, but wasn't", lastEndDateCache.exists());
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
SolrInputDocument doc0 = new SolrInputDocument();
doc0.addField("id", "doc0");
doc0.addField("created", new Date());
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField("id", "doc1");
doc1.addField("created", new Date());
solrClient.add(doc0);
solrClient.add(doc1);
solrClient.commit();
runner.run(1,true, false);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
}
@Test
public void testCompletenessDespiteDeletions() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
SolrInputDocument doc10 = new SolrInputDocument();
doc10.addField("id", "doc10");
doc10.addField("created", new Date());
SolrInputDocument doc11 = new SolrInputDocument();
doc11.addField("id", "doc11");
doc11.addField("created", new Date());
solrClient.add(doc10);
solrClient.add(doc11);
solrClient.deleteById("doc0");
solrClient.deleteById("doc1");
solrClient.deleteById("doc2");
solrClient.commit();
runner.run(1,true, false);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
}
@Test
public void testInitialDateFilter() throws IOException, SolrServerException {
final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
df.setTimeZone(TimeZone.getTimeZone("GMT"));
final Date dateToFilter = new Date();
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter));
runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
SolrInputDocument doc10 = new SolrInputDocument();
doc10.addField("id", "doc10");
doc10.addField("created", new Date());
SolrInputDocument doc11 = new SolrInputDocument();
doc11.addField("id", "doc11");
doc11.addField("created", new Date());
solrClient.add(doc10);
solrClient.add(doc11);
solrClient.commit();
runner.run(1,true, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
}
@Test
public void testPropertyModified() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
// Change property contained in propertyNamesForActivatingClearState
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created,string_multi");
runner.run(1, false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
// Change property not contained in propertyNamesForActivatingClearState
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.run(1, true, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
runner.clearTransferState();
}
@Test
public void testStateCleared() throws IOException, SolrServerException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue());
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "1");
runner.setProperty(GetSolr.RETURN_FIELDS, "id,created");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
runner.run(1,false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
// run without clearing statemanager
runner.run(1,false, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0);
runner.clearTransferState();
// run with cleared statemanager
runner.getStateManager().clear(Scope.CLUSTER);
runner.run(1, true, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10);
runner.clearTransferState();
}
@Test
public void testRecordWriter() throws IOException, SolrServerException, InitializationException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr");
runner.setProperty(GetSolr.DATE_FIELD, "created");
runner.setProperty(GetSolr.BATCH_SIZE, "2");
runner.setProperty(GetSolr.COLLECTION, "testCollection");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.setProperty(GetSolr.RECORD_WRITER, "writer");
runner.run(1,true, true);
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 5);
runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
} }
// Override createSolrClient and return the passed in SolrClient // Override createSolrClient and return the passed in SolrClient

View File

@ -69,6 +69,7 @@ public class TestPutSolrContentStream {
static final SolrDocument expectedDoc1 = new SolrDocument(); static final SolrDocument expectedDoc1 = new SolrDocument();
static { static {
expectedDoc1.addField("id", "1");
expectedDoc1.addField("first", "John"); expectedDoc1.addField("first", "John");
expectedDoc1.addField("last", "Doe"); expectedDoc1.addField("last", "Doe");
expectedDoc1.addField("grade", 8); expectedDoc1.addField("grade", 8);
@ -79,6 +80,7 @@ public class TestPutSolrContentStream {
static final SolrDocument expectedDoc2 = new SolrDocument(); static final SolrDocument expectedDoc2 = new SolrDocument();
static { static {
expectedDoc2.addField("id", "2");
expectedDoc2.addField("first", "John"); expectedDoc2.addField("first", "John");
expectedDoc2.addField("last", "Doe"); expectedDoc2.addField("last", "Doe");
expectedDoc2.addField("grade", 8); expectedDoc2.addField("grade", 8);
@ -137,6 +139,7 @@ public class TestPutSolrContentStream {
runner.setProperty("f.4", "subject:/exams/subject"); runner.setProperty("f.4", "subject:/exams/subject");
runner.setProperty("f.5", "test:/exams/test"); runner.setProperty("f.5", "test:/exams/test");
runner.setProperty("f.6", "marks:/exams/marks"); runner.setProperty("f.6", "marks:/exams/marks");
runner.setProperty("f.7", "id:/exams/id");
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
@ -162,7 +165,7 @@ public class TestPutSolrContentStream {
final TestRunner runner = createDefaultTestRunner(proc); final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv"); runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv");
runner.setProperty("fieldnames", "first,last,grade,subject,test,marks"); runner.setProperty("fieldnames", "id,first,last,grade,subject,test,marks");
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
runner.enqueue(fileIn); runner.enqueue(fileIn);
@ -219,6 +222,7 @@ public class TestPutSolrContentStream {
// add a document so there is something to delete // add a document so there is something to delete
SolrInputDocument doc = new SolrInputDocument(); SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "1");
doc.addField("first", "bob"); doc.addField("first", "bob");
doc.addField("last", "smith"); doc.addField("last", "smith");
doc.addField("created", new Date()); doc.addField("created", new Date());

View File

@ -16,6 +16,16 @@
<field name="marks" type="int" indexed="true" stored="true" /> <field name="marks" type="int" indexed="true" stored="true" />
<field name="test" type="string" indexed="true" stored="true" /> <field name="test" type="string" indexed="true" stored="true" />
<field name="subject" type="string" indexed="true" stored="true" /> <field name="subject" type="string" indexed="true" stored="true" />
<field name="created" type="date" indexed="true" stored="true" /> <field name="created" type="date" indexed="true" stored="true" />
<field name="id" type="string" indexed="true" stored="true" />
<field name="double_single" type="double" indexed="true" stored="true" />
<field name="integer_single" type="int" indexed="true" stored="true" />
<field name="integer_multi" type="int" indexed="true" stored="true" multiValued="true"/>
<field name="string_single" type="string" indexed="true" stored="true" />
<field name="string_multi" type="string" indexed="true" stored="true" multiValued="true"/>
<uniqueKey>id</uniqueKey>
</schema> </schema>

View File

@ -0,0 +1,25 @@
{
"name": "testschema",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "string_single", "type": "string" },
{ "name": "string_multi",
"type": {
"type": "array",
"items": "string"
}
},
{ "name": "integer_single", "type": "int" },
{ "name": "integer_multi",
"type": {
"type": "array",
"items": "int"
}
},
{ "name": "double_single", "type": "double" },
{ "name": "created", "type": "long" },
{ "name": "val_null", "type": "string" }
]
}

View File

@ -1,2 +1,2 @@
John,Doe,8,Math,term1,90 1,John,Doe,8,Math,term1,90
John,Doe,8,Biology,term1,86 2,John,Doe,8,Biology,term1,86

1 1 John Doe 8 Math term1 90
2 2 John Doe 8 Biology term1 86

View File

@ -4,10 +4,12 @@
"grade": 8, "grade": 8,
"exams": [ "exams": [
{ {
"id": "1",
"subject": "Math", "subject": "Math",
"test" : "term1", "test" : "term1",
"marks":90}, "marks":90},
{ {
"id": "2",
"subject": "Biology", "subject": "Biology",
"test" : "term1", "test" : "term1",
"marks":86} "marks":86}

View File

@ -1,5 +1,6 @@
[ [
{ {
"id": "1",
"first": "John", "first": "John",
"last": "Doe", "last": "Doe",
"grade": 8, "grade": 8,
@ -8,6 +9,7 @@
"marks": 90 "marks": 90
}, },
{ {
"id": "2",
"first": "John", "first": "John",
"last": "Doe", "last": "Doe",
"grade": 8, "grade": 8,

View File

@ -1,5 +1,6 @@
<add> <add>
<doc> <doc>
<field name="id">1</field>
<field name="first">John</field> <field name="first">John</field>
<field name="last">Doe</field> <field name="last">Doe</field>
<field name="grade">8</field> <field name="grade">8</field>
@ -8,6 +9,7 @@
<field name="marks">90</field> <field name="marks">90</field>
</doc> </doc>
<doc> <doc>
<field name="id">2</field>
<field name="first">John</field> <field name="first">John</field>
<field name="last">Doe</field> <field name="last">Doe</field>
<field name="grade">8</field> <field name="grade">8</field>