diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 6682551638..fd23f28d10 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -219,7 +219,7 @@ false false - + 30 sec diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index d230cc178e..0af6043033 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -117,7 +117,7 @@ public class GetSolr extends SolrProcessor { final List descriptors = new ArrayList<>(); descriptors.add(SOLR_TYPE); descriptors.add(SOLR_LOCATION); - descriptors.add(DEFAULT_COLLECTION); + descriptors.add(COLLECTION); descriptors.add(SOLR_QUERY); descriptors.add(RETURN_FIELDS); descriptors.add(SORT_CLAUSE); @@ -153,14 +153,6 @@ public class GetSolr extends SolrProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final ProcessorLog logger = getLogger(); - - final FlowFile incomingFlowFile = session.get(); - if (incomingFlowFile != null) { - session.transfer(incomingFlowFile, REL_SUCCESS); - logger.warn("found FlowFile {} in input queue; transferring to success", - new Object[]{incomingFlowFile}); - } - readLastEndDate(); final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); @@ -201,7 +193,7 @@ public class GetSolr extends SolrProcessor { try { // run the initial query and send out the first page of results final StopWatch stopWatch = new StopWatch(true); - QueryResponse response = getSolrServer().query(solrQuery); + QueryResponse response = getSolrClient().query(solrQuery); stopWatch.stop(); long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); @@ -218,7 +210,7 @@ public class GetSolr extends SolrProcessor { StringBuilder transitUri = new StringBuilder("solr://"); transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { - transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue()); + transitUri.append("/").append(context.getProperty(COLLECTION).getValue()); } session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); @@ -232,7 +224,7 @@ public class GetSolr extends SolrProcessor { solrQuery.setStart(endRow); stopWatch.start(); - response = getSolrServer().query(solrQuery); + response = getSolrClient().query(solrQuery); stopWatch.stop(); duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 704d8a21e1..7cef7a5a04 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -18,9 +18,10 @@ */ package org.apache.nifi.processors.solr; -import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -48,13 +49,16 @@ import java.util.concurrent.TimeUnit; @Tags({"Apache", "Solr", "Put", "Send"}) @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value", + description="These parameters will be passed to Solr on the request") public class PutSolrContentStream extends SolrProcessor { - public static final PropertyDescriptor CONTENT_STREAM_URL = new PropertyDescriptor - .Builder().name("Content Stream URL") - .description("The URL in Solr to post the ContentStream") + public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor + .Builder().name("Content Stream Path") + .description("The path in Solr to post the ContentStream") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue("/update/json/docs") .build(); @@ -63,9 +67,18 @@ public class PutSolrContentStream extends SolrProcessor { .description("Content-Type being sent to Solr") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .defaultValue("application/json") .build(); + public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor + .Builder().name("Commit Within") + .description("The number of milliseconds before the given update is committed") + .required(false) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor .Builder().name("Request Parameters") .description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2") @@ -74,8 +87,8 @@ public class PutSolrContentStream extends SolrProcessor { .defaultValue("json.command=false&split=/&f=id:/field1") .build(); - public static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") .description("The original FlowFile") .build(); @@ -89,10 +102,8 @@ public class PutSolrContentStream extends SolrProcessor { .description("FlowFiles that failed because Solr is unreachable") .build(); - /** - * The name of a FlowFile attribute used for specifying a Solr collection. - */ - public static final String SOLR_COLLECTION_ATTR = "solr.collection"; + public static final String COLLECTION_PARAM_NAME = "collection"; + public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; private Set relationships; private List descriptors; @@ -105,14 +116,15 @@ public class PutSolrContentStream extends SolrProcessor { final List descriptors = new ArrayList<>(); descriptors.add(SOLR_TYPE); descriptors.add(SOLR_LOCATION); - descriptors.add(DEFAULT_COLLECTION); - descriptors.add(CONTENT_STREAM_URL); + descriptors.add(COLLECTION); + descriptors.add(CONTENT_STREAM_PATH); descriptors.add(CONTENT_TYPE); + descriptors.add(COMMIT_WITHIN); descriptors.add(REQUEST_PARAMS); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); - relationships.add(REL_ORIGINAL); + relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); relationships.add(REL_CONNECTION_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); @@ -129,7 +141,18 @@ public class PutSolrContentStream extends SolrProcessor { } @Override - protected void additionalOnScheduled(ProcessContext context) { + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @OnScheduled + public void initializeRequestParams(ProcessContext context) { final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue(); this.requestParams = RequestParamsUtil.parse(requestParamsVal); } @@ -143,17 +166,19 @@ public class PutSolrContentStream extends SolrProcessor { final ObjectHolder error = new ObjectHolder<>(null); final ObjectHolder connectionError = new ObjectHolder<>(null); - final ObjectHolder collectionUsed = new ObjectHolder<>(null); - final String collectionAttrVal = flowFile.getAttribute(SOLR_COLLECTION_ATTR); final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); + final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - ContentStreamUpdateRequest request = new ContentStreamUpdateRequest( - context.getProperty(CONTENT_STREAM_URL).getValue()); + final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH) + .evaluateAttributeExpressions().getValue(); + + ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath); request.setParams(new ModifiableSolrParams()); // add the extra params, don't use 'set' in case of repeating params @@ -165,14 +190,13 @@ public class PutSolrContentStream extends SolrProcessor { } } - // send the request to the specified collection, or to the default collection + // specify the collection for SolrCloud if (isSolrCloud) { - String collection = collectionAttrVal; - if (StringUtils.isBlank(collection)) { - collection = context.getProperty(DEFAULT_COLLECTION).getValue(); - } - request.setParam("collection", collection); - collectionUsed.set(collection); + request.setParam(COLLECTION_PARAM_NAME, collection); + } + + if (commitWithin != null && commitWithin > 0) { + request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString()); } try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) { @@ -185,11 +209,11 @@ public class PutSolrContentStream extends SolrProcessor { @Override public String getContentType() { - return context.getProperty(CONTENT_TYPE).getValue(); + return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue(); } }); - UpdateResponse response = request.process(getSolrServer()); + UpdateResponse response = request.process(getSolrClient()); getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()}); } catch (SolrException e) { error.set(e); @@ -213,14 +237,32 @@ public class PutSolrContentStream extends SolrProcessor { StringBuilder transitUri = new StringBuilder("solr://"); transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); if (isSolrCloud) { - transitUri.append(":").append(collectionUsed.get()); + transitUri.append(":").append(collection); } final long duration = timer.getDuration(TimeUnit.MILLISECONDS); session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true); getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration}); - session.transfer(flowFile, REL_ORIGINAL); + session.transfer(flowFile, REL_SUCCESS); } } + // get all of the dynamic properties and values into a Map for later adding to the Solr request + private Map getRequestParams(ProcessContext context, FlowFile flowFile) { + final Map paramsMap = new HashMap<>(); + + for (final Map.Entry entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + final String paramName = descriptor.getName(); + final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); + + if (!paramValue.trim().isEmpty()) { + MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + } + } + } + return paramsMap; + } + } diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java index f286a1a182..70b71a421a 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -51,7 +51,6 @@ public abstract class SolrProcessor extends AbstractProcessor { .Builder().name("Solr Type") .description("The type of Solr instance, Cloud or Standard.") .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD) .defaultValue(SOLR_TYPE_STANDARD.getValue()) .build(); @@ -64,58 +63,48 @@ public abstract class SolrProcessor extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final PropertyDescriptor DEFAULT_COLLECTION = new PropertyDescriptor - .Builder().name("Default Collection") + public static final PropertyDescriptor COLLECTION = new PropertyDescriptor + .Builder().name("Collection") .description("The Solr collection name, only used with a Solr Type of Cloud") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); - private volatile SolrClient solrServer; + private volatile SolrClient solrClient; @OnScheduled public final void onScheduled(final ProcessContext context) throws IOException { - this.solrServer = createSolrServer(context); - additionalOnScheduled(context); + this.solrClient = createSolrClient(context); } /** - * Create a SolrServer based on the type of Solr specified. + * Create a SolrClient based on the type of Solr specified. * * @param context * The context - * @return an HttpSolrServer or CloudSolrServer + * @return an HttpSolrClient or CloudSolrClient */ - protected SolrClient createSolrServer(final ProcessContext context) { + protected SolrClient createSolrClient(final ProcessContext context) { if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue()); } else { CloudSolrClient cloudSolrServer = new CloudSolrClient( context.getProperty(SOLR_LOCATION).getValue()); cloudSolrServer.setDefaultCollection( - context.getProperty(DEFAULT_COLLECTION).getValue()); + context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue()); return cloudSolrServer; } } /** * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the - * {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} method + * {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} method * * @return */ - protected final SolrClient getSolrServer() { - return solrServer; - } - - /** - * Allows additional action to be taken during scheduling of processor. - * - * @param context - * The context - */ - protected void additionalOnScheduled(final ProcessContext context) { - + protected final SolrClient getSolrClient() { + return solrClient; } @Override @@ -123,10 +112,10 @@ public abstract class SolrProcessor extends AbstractProcessor { final List problems = new ArrayList<>(); if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { - final String collection = context.getProperty(DEFAULT_COLLECTION).getValue(); + final String collection = context.getProperty(COLLECTION).getValue(); if (collection == null || collection.trim().isEmpty()) { problems.add(new ValidationResult.Builder() - .subject(DEFAULT_COLLECTION.getName()) + .subject(COLLECTION.getName()) .input(collection).valid(false) .explanation("A collection must specified for Solr Type of Cloud") .build()); diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java index 52eb06b3e1..dcae008595 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -183,7 +183,7 @@ public class TestGetSolr { } - // Override createSolrServer and return the passed in SolrClient + // Override createSolrClient and return the passed in SolrClient private class TestableProcessor extends GetSolr { private SolrClient solrClient; @@ -191,7 +191,7 @@ public class TestGetSolr { this.solrClient = solrClient; } @Override - protected SolrClient createSolrServer(ProcessContext context) { + protected SolrClient createSolrClient(ProcessContext context) { return solrClient; } } diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index 88009788ba..98761c4947 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -82,7 +82,7 @@ public class TestPutSolrContentStream { final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); final TestRunner runner = createDefaultTestRunner(proc); - runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs"); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false"); try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { @@ -91,11 +91,11 @@ public class TestPutSolrContentStream { runner.run(); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); - runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); - verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); } finally { - try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + try { proc.getSolrClient().shutdown(); } catch (Exception e) { } } } @@ -104,7 +104,7 @@ public class TestPutSolrContentStream { final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); final TestRunner runner = createDefaultTestRunner(proc); - runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs"); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "split=/exams" + "&f=first:/first" + @@ -120,11 +120,11 @@ public class TestPutSolrContentStream { runner.run(); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); - runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); - verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); } finally { - try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + try { proc.getSolrClient().shutdown(); } catch (Exception e) { } } } @@ -133,7 +133,7 @@ public class TestPutSolrContentStream { final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); final TestRunner runner = createDefaultTestRunner(proc); - runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/csv"); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv"); runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "fieldnames=first,last,grade,subject,test,marks"); @@ -143,11 +143,11 @@ public class TestPutSolrContentStream { runner.run(); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); - runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); - verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); } finally { - try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + try { proc.getSolrClient().shutdown(); } catch (Exception e) { } } } @@ -156,7 +156,7 @@ public class TestPutSolrContentStream { final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); final TestRunner runner = createDefaultTestRunner(proc); - runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update"); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update"); runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml"); try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) { @@ -165,11 +165,11 @@ public class TestPutSolrContentStream { runner.run(); runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); - runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1); - verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2)); } finally { - try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + try { proc.getSolrClient().shutdown(); } catch (Exception e) { } } } @@ -185,7 +185,7 @@ public class TestPutSolrContentStream { runner.run(); runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1); - verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class)); } } @@ -201,7 +201,7 @@ public class TestPutSolrContentStream { runner.run(); runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); - verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class)); } } @@ -218,7 +218,7 @@ public class TestPutSolrContentStream { runner.run(); runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); - verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class)); } } @@ -230,7 +230,7 @@ public class TestPutSolrContentStream { runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); runner.assertNotValid(); - runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, "someCollection1"); + runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1"); runner.assertValid(); } @@ -264,7 +264,7 @@ public class TestPutSolrContentStream { } @Override - protected SolrClient createSolrServer(ProcessContext context) { + protected SolrClient createSolrClient(ProcessContext context) { mockSolrServer = Mockito.mock(SolrClient.class); try { when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable); @@ -279,7 +279,7 @@ public class TestPutSolrContentStream { } /** - * Override the createSolrServer method and create and EmbeddedSolrServer. + * Override the createSolrClient method and create and EmbeddedSolrServer. */ private class EmbeddedSolrServerProcessor extends PutSolrContentStream { @@ -291,7 +291,7 @@ public class TestPutSolrContentStream { } @Override - protected SolrClient createSolrServer(ProcessContext context) { + protected SolrClient createSolrClient(ProcessContext context) { try { String relPath = getClass().getProtectionDomain() .getCodeSource().getLocation().getFile()