SOLR-14241: New delete() Stream Decorator

This commit is contained in:
Chris Hostetter 2020-02-05 10:49:24 -07:00
parent 136dcbdbbc
commit c5d0391df9
9 changed files with 589 additions and 17 deletions

View File

@ -172,6 +172,8 @@ New Features
* SOLR-14242: HdfsDirectory now supports indexing geo-points, ranges or shapes. (Adrien Grand)
* SOLR-14241: New delete() Stream Decorator (hossman)
Improvements
---------------------
* SOLR-14120: Define JavaScript methods 'includes' and 'startsWith' to ensure AdminUI can be displayed when using

View File

@ -595,6 +595,45 @@ while(true) {
daemonStream.close();
----
== delete
The `delete` function wraps another functions and uses the `id` and `\_version_` values found to sends the tuples to a SolrCloud collection as <<uploading-data-with-index-handlers.adoc#delete-operations,Delete By Id>> commands.
This is similar to the `<<#update,update()>>` function described below.
=== delete Parameters
* `destinationCollection`: (Mandatory) The collection where the tuples will deleted.
* `batchSize`: (Mandatory) The indexing batch size.
* `pruneVersionField`: (Optional, defaults to `false`) Wether to prune `\_version_` values from tuples
* `StreamExpression`: (Mandatory)
=== delete Syntax
[source,text]
----
delete(collection1
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
----
The example above consumes the tuples returned by the `search` function against `collection1` and converts the `id` value of each document found into a delete request against the same `collection1`.
[NOTE]
====
Unlike the `update()` function, `delete()` defaults to `pruneVersionField=false` -- preserving any `\_version_` values found in the inner stream when converting the tuples to "Delete By ID" requests, to ensure that using this stream will not (by default) result in deleting any documents that were updated _after_ the `search(...)` was executed, but _before_ the `delete(...)` processed that tuple (leveraging <<updating-parts-of-documents.adoc#optimistic-concurrency,Optimistic concurrency>> constraints).
Users who wish to ignore concurrent updates, and delete all matched documents should set `pruneVersionField=false` (or ensure that the inner stream tuples do not include any `\_version_` values).
Users who anticipate concurrent updates, and wish to "skip" any failed deletes, should consider configuraing the {solr-javadocs}/solr-core/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.html[`TolerantUpdateProcessorFactory`]
====
== eval
The `eval` function allows for use cases where new streaming expressions are generated on the fly and then evaluated.
@ -1273,12 +1312,13 @@ unique(
== update
The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing.
The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing as Documents.
=== update Parameters
* `destinationCollection`: (Mandatory) The collection where the tuples will indexed.
* `batchSize`: (Mandatory) The indexing batch size.
* `pruneVersionField`: (Optional, defaults to `true`) Wether to prune `\_version_` values from tuples
* `StreamExpression`: (Mandatory)
=== update Syntax
@ -1296,3 +1336,5 @@ The `update` function wraps another functions and sends the tuples to a SolrClou
----
The example above sends the tuples returned by the `search` function to the `destinationCollection` to be indexed.
Wrapping `search(...)` as showing in this example is the common case usage of this decorator: to read documents from a collection as tuples, process or modify them in some way, and then add them back to a new collection. For this reason, `pruneVersionField=true` is the default behavior -- stripping any `\_version_` values found in the inner stream when converting the tuples to Solr documents to prevent any unexpected errors from <<updating-parts-of-documents.adoc#optimistic-concurrency,Optimistic concurrency>> constraints.

View File

@ -39,6 +39,7 @@ public class Lang {
.withFunctionName("facet", FacetStream.class)
.withFunctionName("facet2D", Facet2DStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("delete", DeleteStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("topic", TopicStream.class)
.withFunctionName("commit", CommitStream.class)
@ -359,4 +360,4 @@ public class Lang {
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("convert", ConversionEvaluator.class);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
/**
* Uses tuples to identify the uniqueKey values of documents to be deleted
*/
public final class DeleteStream extends UpdateStream implements Expressible {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String ID_TUPLE_KEY = "id";
public DeleteStream(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
final Explanation explanation = super.toExplanation(factory);
explanation.setExpression("Delete docs from " + getCollectionName());
return explanation;
}
/**
* {@link DeleteStream} returns <code>false</code> so that Optimistic Concurrency Constraints are
* respected by default when using this stream to wrap a {@link SearchStream} query.
*/
@Override
protected boolean defaultPruneVersionField() {
return false;
}
/**
* Overrides implementation to extract the <code>"id"</code> and <code>"_version_"</code>
* (if included) from each document and use that information to construct a "Delete By Id" request.
* Any other fields (ie: Tuple values) are ignored.
*/
@Override
protected void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
if (documentBatch.size() == 0) {
return;
}
try {
// convert each doc into a deleteById request...
final UpdateRequest req = new UpdateRequest();
for (SolrInputDocument doc : documentBatch) {
final String id = doc.getFieldValue(ID_TUPLE_KEY).toString();
final Long version = getVersion(doc);
req.deleteById(id, version);
}
req.process(getCloudSolrClient(), getCollectionName());
} catch (SolrServerException | NumberFormatException| IOException e) {
log.warn("Unable to delete documents from collection due to unexpected error.", e);
String className = e.getClass().getName();
String message = e.getMessage();
throw new IOException(String.format(Locale.ROOT,"Unexpected error when deleting documents from collection %s- %s:%s", getCollectionName(), className, message));
}
}
/**
* Helper method that can handle String values when dealing with odd
* {@link Tuple} -&gt; {@link SolrInputDocument} conversions
* (ie: <code>tuple(..)</code> in tests)
*/
private static Long getVersion(final SolrInputDocument doc) throws NumberFormatException {
if (! doc.containsKey(VERSION_FIELD)) {
return null;
}
final Object v = doc.getFieldValue(VERSION_FIELD);
if (null == v) {
return null;
}
if (v instanceof Long) {
return (Long)v;
}
return Long.parseLong(v.toString());
}
}

View File

@ -40,10 +40,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
/**
* Sends tuples emitted by a wrapped {@link TupleStream} as updates to a SolrCloud collection.
@ -56,6 +56,13 @@ public class UpdateStream extends TupleStream implements Expressible {
private String collection;
private String zkHost;
private int updateBatchSize;
/**
* Indicates if the {@link CommonParams#VERSION_FIELD} should be removed from tuples when converting
* to Solr Documents.
* May be set per expression using the <code>"pruneVersionField"</code> named operand,
* defaults to the value returned by {@link #defaultPruneVersionField()}
*/
private boolean pruneVersionField;
private int batchNumber;
private long totalDocsIndex;
private PushBackStream tupleSource;
@ -64,7 +71,6 @@ public class UpdateStream extends TupleStream implements Expressible {
private List<SolrInputDocument> documentBatch = new ArrayList();
private String coreName;
public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
verifyCollectionName(collectionName, expression);
@ -73,6 +79,7 @@ public class UpdateStream extends TupleStream implements Expressible {
verifyZkHost(zkHost, collectionName, expression);
int updateBatchSize = extractBatchSize(expression, factory);
pruneVersionField = factory.getBooleanOperand(expression, "pruneVersionField", defaultPruneVersionField());
//Extract underlying TupleStream.
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@ -80,7 +87,6 @@ public class UpdateStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
StreamExpression sourceStreamExpression = streamExpressions.get(0);
init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, updateBatchSize);
}
@ -88,9 +94,10 @@ public class UpdateStream extends TupleStream implements Expressible {
if (updateBatchSize <= 0) {
throw new IOException(String.format(Locale.ROOT,"batchSize '%d' must be greater than 0.", updateBatchSize));
}
pruneVersionField = defaultPruneVersionField();
init(collectionName, tupleSource, zkHost, updateBatchSize);
}
private void init(String collectionName, TupleStream tupleSource, String zkHost, int updateBatchSize) {
this.collection = collectionName;
this.zkHost = zkHost;
@ -98,6 +105,11 @@ public class UpdateStream extends TupleStream implements Expressible {
this.tupleSource = new PushBackStream(tupleSource);
}
/** The name of the collection being updated */
protected String getCollectionName() {
return collection;
}
@Override
public void open() throws IOException {
setCloudSolrClient();
@ -257,6 +269,21 @@ public class UpdateStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize '%s' is not a valid integer.",expression, batchSizeStr));
}
}
/**
* Used during initialization to specify the default value for the <code>"pruneVersionField"</code> option.
* {@link UpdateStream} returns <code>true</code> for backcompat and to simplify slurping of data from one
* collection to another.
*/
protected boolean defaultPruneVersionField() {
return true;
}
/** Only viable after calling {@link #open} */
protected CloudSolrClient getCloudSolrClient() {
assert null != this.cloudSolrClient;
return this.cloudSolrClient;
}
private void setCloudSolrClient() {
if(this.cache != null) {
@ -272,7 +299,8 @@ public class UpdateStream extends TupleStream implements Expressible {
private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
SolrInputDocument doc = new SolrInputDocument();
for (Object field : tuple.fields.keySet()) {
if (! field.equals(VERSION_FIELD)) {
if (! (field.equals(CommonParams.VERSION_FIELD) && pruneVersionField)) {
Object value = tuple.get(field);
if (value instanceof List) {
addMultivaluedField(doc, (String)field, (List<Object>)value);
@ -292,7 +320,11 @@ public class UpdateStream extends TupleStream implements Expressible {
}
}
private void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
/**
* This method will be called on every batch of tuples comsumed, after converting each tuple
* in that batch to a Solr Input Document.
*/
protected void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
if (documentBatch.size() == 0) {
return;
}
@ -300,6 +332,12 @@ public class UpdateStream extends TupleStream implements Expressible {
try {
cloudSolrClient.add(collection, documentBatch);
} catch (SolrServerException | IOException e) {
// TODO: it would be nice if there was an option to "skipFailedBatches"
// TODO: and just record the batch failure info in the summary tuple for that batch and continue
//
// TODO: The summary batches (and/or stream error) should also pay attention to the error metadata
// from the SolrServerException ... and ideally also any TolerantUpdateProcessor metadata
log.warn("Unable to add documents to collection due to unexpected error.", e);
String className = e.getClass().getName();
String message = e.getMessage();

View File

@ -35,6 +35,11 @@
</updateLog>
</updateHandler>
<updateRequestProcessorChain default="true">
<!-- be tolerant of errors for testing optimistic concurrency of delete() stream -->
<processor class="solr.TolerantUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestDispatcher>
<requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="-1" />

View File

@ -45,7 +45,7 @@ import org.junit.Test;
public class TestLang extends SolrTestCase {
private static final String[] allFunctions = {
"search", "facet", "facet2D", "update", "jdbc", "topic", "commit", "random", "knnSearch", "merge",
"search", "facet", "facet2D", "update", "delete", "jdbc", "topic", "commit", "random", "knnSearch", "merge",
"unique", "top", "group", "reduce", "parallel", "rollup", "stats", "innerJoin",
"leftOuterJoin", "hashJoin", "outerHashJoin", "intersect", "complement", "sort",
"train", "features", "daemon", "shortestPath", "gatherNodes", "nodes",

View File

@ -246,7 +246,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id='42',a_i=1,b_i=5))"));
"tuple(id=42,a_i=1,b_i=5))"));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
@ -259,7 +259,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id='42',a_i=1,b_i=5))"));
"tuple(id=42,a_i=1,b_i=5))"));
// "WRITE" credentials should be required for 'update(...)'
solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
@ -278,7 +278,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id='42',a_i=1,b_i=5))"));
"tuple(id=42,a_i=1,b_i=5))"));
solrStream.setCredentials(user, user);
@ -296,7 +296,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id='42',a_i=1,b_i=5))"));
"tuple(id=42,a_i=1,b_i=5))"));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
@ -318,7 +318,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
+ " search("+COLLECTION_Y+", "
+ " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch
+ " rows=100, "
+ " fl=\"id,foo_i\", "
+ " fl=\"id,foo_i,_version_\", " // pruneVersionField default true
+ " sort=\"foo_i desc\")) "
;
@ -370,7 +370,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id='42',a_i=1,b_i=5))"));
"tuple(id=42,a_i=1,b_i=5))"));
solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
// NOTE: Can't make any assertions about Exception: SOLR-14226
@ -386,7 +386,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
final String expr
= "executor(threads=1, "
+ " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, "
+ " tuple(id='42',a_i=1,b_i=5)) "
+ " tuple(id=42,a_i=1,b_i=5)) "
+ " \")) "
;
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
@ -554,7 +554,226 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
public void testSimpleDeleteStream() throws Exception {
assertEquals(0,
(setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.add(sdoc("id", "42"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"delete("+COLLECTION_X+",batchSize=1," +
"tuple(id=42))"));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
/** A simple "Delete by Query" example */
public void testSimpleDeleteStreamByQuery() throws Exception {
{ // Put some "real" docs directly to both X...
final UpdateRequest update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER);
for (int i = 1; i <= 42; i++) {
update.add(sdoc("id",i+"x","foo_i",""+i));
}
assertEquals("initial docs in X",
0, update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
}
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
{ // WRITE_X user should be able to delete X via a query from X
final String expr
= "delete("+COLLECTION_X+", batchSize=5, " // note batch size
+ " search("+COLLECTION_X+", "
+ " q=\"foo_i:[* TO 10]\", " // 10 matches = 2 batches
+ " rows=100, "
+ " fl=\"id,foo_i,_version_\", " // foo_i should be ignored...
+ " sort=\"foo_i desc\")) " // version constraint should be ok
;
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(2, tuples.size());
assertEquals(5L, tuples.get(0).get("totalIndexed"));
assertEquals(10L, tuples.get(1).get("totalIndexed"));
}
assertEquals(42L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
public void testSimpleDeleteStreamInvalidCredentials() throws Exception {
assertEquals(0,
(setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.add(sdoc("id", "42"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id=42))"));
// "WRITE" credentials should be required for 'update(...)'
solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
});
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
public void testSimpleDeleteStreamInsufficientCredentials() throws Exception {
assertEquals(0,
(setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.add(sdoc("id", "42"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
// both of these users have valid credentials and authz read COLLECTION_X, but neither has
// authz to write to X...
for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
"tuple(id=42))"));
solrStream.setCredentials(user, user);
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
});
}
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
public void testIndirectDeleteStream() throws Exception {
{ // Put some "real" docs directly to both X & Y...
final UpdateRequest xxx_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER);
final UpdateRequest yyy_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER);
for (int i = 1; i <= 42; i++) {
xxx_Update.add(sdoc("id",i+"z","foo_i",""+i));
yyy_Update.add(sdoc("id",i+"z","foo_i",""+i));
}
assertEquals("initial docs in X",
0, xxx_Update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
assertEquals("initial docs in Y",
0, yyy_Update.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
}
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
{ // WRITE_X user should be able to delete X via a (dummy) stream from Y...
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
params("qt", "/stream", "expr",
"delete("+COLLECTION_X+",batchSize=1," +
"tuple(id=42z))"));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
}
assertEquals(42L - 1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
{ // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via Y)
final String expr
= "delete("+COLLECTION_X+", batchSize=50, " // note batch size
+ " pruneVersionField=true, " // NOTE: ignoring Y version to del X
+ " search("+COLLECTION_Y+", "
+ " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch
+ " rows=100, "
+ " fl=\"id,foo_i,_version_\", " // foo_i & version should be ignored
+ " sort=\"foo_i desc\")) "
;
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, // NOTE: Y route
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(10L, tuples.get(0).get("batchIndexed"));
assertEquals(10L, tuples.get(0).get("totalIndexed"));
}
assertEquals(42L - 1L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
{ // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via X)...
final String expr
= "delete("+COLLECTION_X+", batchSize=5, " // note batch size
+ " search("+COLLECTION_Y+", "
+ " q=\"foo_i:[30 TO *]\", " // 13 matches = 3 batches
+ " rows=100, "
+ " fl=\"id,foo_i\", " // foo_i should be ignored
+ " sort=\"foo_i desc\")) "
;
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, // NOTE: X route
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(3, tuples.size());
assertEquals( 5L, tuples.get(0).get("batchIndexed"));
assertEquals( 5L, tuples.get(0).get("totalIndexed"));
assertEquals( 5L, tuples.get(1).get("batchIndexed"));
assertEquals(10L, tuples.get(1).get("totalIndexed"));
assertEquals( 3L, tuples.get(2).get("batchIndexed"));
assertEquals(13L, tuples.get(2).get("totalIndexed"));
}
assertEquals(42L - 1L - 10L - (13L - 1L), // '42' in last 13 deletes was already deleted from X
commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
}
public void testIndirectDeleteStreamInsufficientCredentials() throws Exception {
assertEquals(0,
(setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.add(sdoc("id", "42"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
// regardless of how it's routed, WRITE_Y should NOT have authz to delete from X...
for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
params("qt", "/stream", "expr",
"delete("+COLLECTION_X+",batchSize=1," +
"tuple(id=42))"));
solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
});
}
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
}
/**
* Helper method that uses the specified user to (first commit, and then) count the total
* number of documents in the collection

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@ -56,8 +57,10 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Assume;
@ -2696,7 +2699,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
try {
//Copy all docs to destinationCollection
expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))");
// confirm update() stream defaults to ignoring _version_ field in tuples
expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,_version_,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))");
stream = new UpdateStream(expression, factory);
stream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(stream);
@ -4267,6 +4271,155 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
}
}
public void testDeleteStream() throws Exception {
final String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
final SolrClient client = cluster.getSolrClient();
{ final UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 20; i++) {
req.add(id, "doc_"+i, "deletable_s", "yup");
}
assertEquals(0, req.commit(cluster.getSolrClient(), COLLECTIONORALIAS).getStatus());
}
// fetch the _version_ param assigned each doc to test optimistic concurrency later...
final Map<String,Long> versions = new HashMap<>();
{ final QueryResponse allDocs = client.query(COLLECTIONORALIAS, params("q","deletable_s:yup",
"rows","100"));
assertEquals(20L, allDocs.getResults().getNumFound());
for (SolrDocument doc : allDocs.getResults()) {
versions.put(doc.getFirstValue("id").toString(), (Long) doc.getFirstValue("_version_"));
}
}
{ // trivially delete 1 doc
final String expr
= "commit("+COLLECTIONORALIAS+",waitSearcher=true, "
+ " delete("+COLLECTIONORALIAS+",batchSize=10, "
+ " tuple(id=doc_2))) "
;
final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
final List<Tuple> tuples = getTuples(stream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
assertEquals(20L - 1L,
client.query(COLLECTIONORALIAS,
params("q","deletable_s:yup")).getResults().getNumFound());
}
{ // delete 5 docs, spread across 3 batches (2 + 2 + 1)
final String expr
= "commit("+COLLECTIONORALIAS+",waitSearcher=true, "
+ " delete("+COLLECTIONORALIAS+",batchSize=2,list( " // NOTE: batch size
+ " tuple(id=doc_3), "
+ " tuple(id=doc_11), "
+ " tuple(id=doc_7), "
+ " tuple(id=doc_17), "
+ " tuple(id=doc_15), "
+ " ) ) ) "
;
final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
final List<Tuple> tuples = getTuples(stream);
assertEquals(3, tuples.size());
assertEquals(2L, tuples.get(0).get("totalIndexed"));
assertEquals(4L, tuples.get(1).get("totalIndexed"));
assertEquals(5L, tuples.get(2).get("totalIndexed"));
assertEquals(20L - 1L - 5L,
client.query(COLLECTIONORALIAS,
params("q","deletable_s:yup")).getResults().getNumFound());
}
{ // attempt to delete 2 docs, one with correct version, one with "stale" version that should fail
// but config uses TolerantUpdateProcessorFactory so batch should still be ok...
//
// It would be nice it there was a more explicit, targetted, option for update() and delete() to
// ensure that even if one "batch" fails it continues with other batches.
// See TODO in UpdateStream
final long v13_ok = versions.get("doc_13").longValue();
final long v10_bad = versions.get("doc_10").longValue() - 42L;
final String expr
= "commit("+COLLECTIONORALIAS+",waitSearcher=true, "
+ " delete("+COLLECTIONORALIAS+",batchSize=10,list( "
+ " tuple(id=doc_10,_version_="+v10_bad+"), "
+ " tuple(id=doc_13,_version_="+v13_ok+"), "
+ " ) ) ) "
;
final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
final List<Tuple> tuples = getTuples(stream);
assertEquals(1, tuples.size());
assertEquals(2L, tuples.get(0).get("totalIndexed"));
// should still be in the index due to version conflict...
assertEquals(1L, client.query(COLLECTIONORALIAS,
params("q","id:doc_10")).getResults().getNumFound());
// should not be in the index due to successful delete...
assertEquals(0L, client.query(COLLECTIONORALIAS,
params("q","id:doc_13")).getResults().getNumFound());
assertEquals(20L - 1L - 5L - 1L,
client.query(COLLECTIONORALIAS,
params("q","deletable_s:yup")).getResults().getNumFound());
}
{ // by using pruneVersionField=true we should be able to ignore optimistic concurrency constraints,
// and delete docs even if the stream we are wrapping returns _version_ values that are no
// longer valid...
final long v10_bad = versions.get("doc_10").longValue() - 42L;
final String expr
= "commit("+COLLECTIONORALIAS+",waitSearcher=true, "
+ " delete("+COLLECTIONORALIAS+",batchSize=10, "
+ " pruneVersionField=true, list( "
+ " tuple(id=doc_10,_version_="+v10_bad+"), "
+ " ) ) ) "
;
final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
final List<Tuple> tuples = getTuples(stream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
// _version_should have been ignored and doc deleted anyway...
assertEquals(0L, client.query(COLLECTIONORALIAS,
params("q","id:doc_10")).getResults().getNumFound());
assertEquals(20L - 1L - 5L - 1L - 1L,
client.query(COLLECTIONORALIAS,
params("q","deletable_s:yup")).getResults().getNumFound());
}
{ // now test a "realistic" DBQ type situation, confirm all (remaining) matching docs deleted...
final String expr
= "commit("+COLLECTIONORALIAS+",waitSearcher=true, "
+ " delete("+COLLECTIONORALIAS+",batchSize=99, "
+ " search("+COLLECTIONORALIAS+",qt=\"/export\", "
+ " q=\"deletable_s:yup\", "
+ " sort=\"id asc\",fl=\"id,_version_\" "
+ " ) ) ) "
;
final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
final List<Tuple> tuples = getTuples(stream);
assertEquals(1, tuples.size());
assertEquals(20L - 1L - 5L - 1L - 1L,
tuples.get(0).get("totalIndexed"));
// shouldn't be anything left...
assertEquals(0L,
client.query(COLLECTIONORALIAS,
params("q","deletable_s:yup")).getResults().getNumFound());
}
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
List<Tuple> tuples = new ArrayList<Tuple>();