diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a176c08ee7d..57d53823bfb 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -172,6 +172,8 @@ New Features * SOLR-14242: HdfsDirectory now supports indexing geo-points, ranges or shapes. (Adrien Grand) + * SOLR-14241: New delete() Stream Decorator (hossman) + Improvements --------------------- * SOLR-14120: Define JavaScript methods 'includes' and 'startsWith' to ensure AdminUI can be displayed when using diff --git a/solr/solr-ref-guide/src/stream-decorator-reference.adoc b/solr/solr-ref-guide/src/stream-decorator-reference.adoc index ab748846eb8..d759c0b9e54 100644 --- a/solr/solr-ref-guide/src/stream-decorator-reference.adoc +++ b/solr/solr-ref-guide/src/stream-decorator-reference.adoc @@ -595,6 +595,45 @@ while(true) { daemonStream.close(); ---- +== delete + +The `delete` function wraps another functions and uses the `id` and `\_version_` values found to sends the tuples to a SolrCloud collection as <> commands. + +This is similar to the `<<#update,update()>>` function described below. + +=== delete Parameters + +* `destinationCollection`: (Mandatory) The collection where the tuples will deleted. +* `batchSize`: (Mandatory) The indexing batch size. +* `pruneVersionField`: (Optional, defaults to `false`) Wether to prune `\_version_` values from tuples +* `StreamExpression`: (Mandatory) + +=== delete Syntax + +[source,text] +---- + delete(collection1 + batchSize=500, + search(collection1, + q=old_data:true, + qt="/export", + fl="id", + sort="a_f asc, a_i asc")) + +---- + +The example above consumes the tuples returned by the `search` function against `collection1` and converts the `id` value of each document found into a delete request against the same `collection1`. + +[NOTE] +==== +Unlike the `update()` function, `delete()` defaults to `pruneVersionField=false` -- preserving any `\_version_` values found in the inner stream when converting the tuples to "Delete By ID" requests, to ensure that using this stream will not (by default) result in deleting any documents that were updated _after_ the `search(...)` was executed, but _before_ the `delete(...)` processed that tuple (leveraging <> constraints). + +Users who wish to ignore concurrent updates, and delete all matched documents should set `pruneVersionField=false` (or ensure that the inner stream tuples do not include any `\_version_` values). + +Users who anticipate concurrent updates, and wish to "skip" any failed deletes, should consider configuraing the {solr-javadocs}/solr-core/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.html[`TolerantUpdateProcessorFactory`] +==== + + == eval The `eval` function allows for use cases where new streaming expressions are generated on the fly and then evaluated. @@ -1273,12 +1312,13 @@ unique( == update -The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing. +The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing as Documents. === update Parameters * `destinationCollection`: (Mandatory) The collection where the tuples will indexed. * `batchSize`: (Mandatory) The indexing batch size. +* `pruneVersionField`: (Optional, defaults to `true`) Wether to prune `\_version_` values from tuples * `StreamExpression`: (Mandatory) === update Syntax @@ -1296,3 +1336,5 @@ The `update` function wraps another functions and sends the tuples to a SolrClou ---- The example above sends the tuples returned by the `search` function to the `destinationCollection` to be indexed. + +Wrapping `search(...)` as showing in this example is the common case usage of this decorator: to read documents from a collection as tuples, process or modify them in some way, and then add them back to a new collection. For this reason, `pruneVersionField=true` is the default behavior -- stripping any `\_version_` values found in the inner stream when converting the tuples to Solr documents to prevent any unexpected errors from <> constraints. diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java index 46cdb054814..05ba98f28e1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java @@ -39,6 +39,7 @@ public class Lang { .withFunctionName("facet", FacetStream.class) .withFunctionName("facet2D", Facet2DStream.class) .withFunctionName("update", UpdateStream.class) + .withFunctionName("delete", DeleteStream.class) .withFunctionName("jdbc", JDBCStream.class) .withFunctionName("topic", TopicStream.class) .withFunctionName("commit", CommitStream.class) @@ -359,4 +360,4 @@ public class Lang { .withFunctionName("if", IfThenElseEvaluator.class) .withFunctionName("convert", ConversionEvaluator.class); } -} \ No newline at end of file +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java new file mode 100644 index 00000000000..d6ffd550eab --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrInputDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.params.CommonParams.VERSION_FIELD; + +/** + * Uses tuples to identify the uniqueKey values of documents to be deleted + */ +public final class DeleteStream extends UpdateStream implements Expressible { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String ID_TUPLE_KEY = "id"; + + public DeleteStream(StreamExpression expression, StreamFactory factory) throws IOException { + super(expression, factory); + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + final Explanation explanation = super.toExplanation(factory); + explanation.setExpression("Delete docs from " + getCollectionName()); + + return explanation; + } + + /** + * {@link DeleteStream} returns false so that Optimistic Concurrency Constraints are + * respected by default when using this stream to wrap a {@link SearchStream} query. + */ + @Override + protected boolean defaultPruneVersionField() { + return false; + } + + /** + * Overrides implementation to extract the "id" and "_version_" + * (if included) from each document and use that information to construct a "Delete By Id" request. + * Any other fields (ie: Tuple values) are ignored. + */ + @Override + protected void uploadBatchToCollection(List documentBatch) throws IOException { + if (documentBatch.size() == 0) { + return; + } + + try { + // convert each doc into a deleteById request... + final UpdateRequest req = new UpdateRequest(); + for (SolrInputDocument doc : documentBatch) { + final String id = doc.getFieldValue(ID_TUPLE_KEY).toString(); + final Long version = getVersion(doc); + req.deleteById(id, version); + } + req.process(getCloudSolrClient(), getCollectionName()); + } catch (SolrServerException | NumberFormatException| IOException e) { + log.warn("Unable to delete documents from collection due to unexpected error.", e); + String className = e.getClass().getName(); + String message = e.getMessage(); + throw new IOException(String.format(Locale.ROOT,"Unexpected error when deleting documents from collection %s- %s:%s", getCollectionName(), className, message)); + } + } + + /** + * Helper method that can handle String values when dealing with odd + * {@link Tuple} -> {@link SolrInputDocument} conversions + * (ie: tuple(..) in tests) + */ + private static Long getVersion(final SolrInputDocument doc) throws NumberFormatException { + if (! doc.containsKey(VERSION_FIELD)) { + return null; + } + final Object v = doc.getFieldValue(VERSION_FIELD); + if (null == v) { + return null; + } + if (v instanceof Long) { + return (Long)v; + } + return Long.parseLong(v.toString()); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java index 7ea88085940..5313f147703 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java @@ -40,10 +40,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.params.CommonParams.VERSION_FIELD; /** * Sends tuples emitted by a wrapped {@link TupleStream} as updates to a SolrCloud collection. @@ -56,6 +56,13 @@ public class UpdateStream extends TupleStream implements Expressible { private String collection; private String zkHost; private int updateBatchSize; + /** + * Indicates if the {@link CommonParams#VERSION_FIELD} should be removed from tuples when converting + * to Solr Documents. + * May be set per expression using the "pruneVersionField" named operand, + * defaults to the value returned by {@link #defaultPruneVersionField()} + */ + private boolean pruneVersionField; private int batchNumber; private long totalDocsIndex; private PushBackStream tupleSource; @@ -64,7 +71,6 @@ public class UpdateStream extends TupleStream implements Expressible { private List documentBatch = new ArrayList(); private String coreName; - public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException { String collectionName = factory.getValueOperand(expression, 0); verifyCollectionName(collectionName, expression); @@ -73,6 +79,7 @@ public class UpdateStream extends TupleStream implements Expressible { verifyZkHost(zkHost, collectionName, expression); int updateBatchSize = extractBatchSize(expression, factory); + pruneVersionField = factory.getBooleanOperand(expression, "pruneVersionField", defaultPruneVersionField()); //Extract underlying TupleStream. List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); @@ -80,7 +87,6 @@ public class UpdateStream extends TupleStream implements Expressible { throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); } StreamExpression sourceStreamExpression = streamExpressions.get(0); - init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, updateBatchSize); } @@ -88,9 +94,10 @@ public class UpdateStream extends TupleStream implements Expressible { if (updateBatchSize <= 0) { throw new IOException(String.format(Locale.ROOT,"batchSize '%d' must be greater than 0.", updateBatchSize)); } + pruneVersionField = defaultPruneVersionField(); init(collectionName, tupleSource, zkHost, updateBatchSize); } - + private void init(String collectionName, TupleStream tupleSource, String zkHost, int updateBatchSize) { this.collection = collectionName; this.zkHost = zkHost; @@ -98,6 +105,11 @@ public class UpdateStream extends TupleStream implements Expressible { this.tupleSource = new PushBackStream(tupleSource); } + /** The name of the collection being updated */ + protected String getCollectionName() { + return collection; + } + @Override public void open() throws IOException { setCloudSolrClient(); @@ -257,6 +269,21 @@ public class UpdateStream extends TupleStream implements Expressible { throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize '%s' is not a valid integer.",expression, batchSizeStr)); } } + + /** + * Used during initialization to specify the default value for the "pruneVersionField" option. + * {@link UpdateStream} returns true for backcompat and to simplify slurping of data from one + * collection to another. + */ + protected boolean defaultPruneVersionField() { + return true; + } + + /** Only viable after calling {@link #open} */ + protected CloudSolrClient getCloudSolrClient() { + assert null != this.cloudSolrClient; + return this.cloudSolrClient; + } private void setCloudSolrClient() { if(this.cache != null) { @@ -272,7 +299,8 @@ public class UpdateStream extends TupleStream implements Expressible { private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) { SolrInputDocument doc = new SolrInputDocument(); for (Object field : tuple.fields.keySet()) { - if (! field.equals(VERSION_FIELD)) { + + if (! (field.equals(CommonParams.VERSION_FIELD) && pruneVersionField)) { Object value = tuple.get(field); if (value instanceof List) { addMultivaluedField(doc, (String)field, (List)value); @@ -292,7 +320,11 @@ public class UpdateStream extends TupleStream implements Expressible { } } - private void uploadBatchToCollection(List documentBatch) throws IOException { + /** + * This method will be called on every batch of tuples comsumed, after converting each tuple + * in that batch to a Solr Input Document. + */ + protected void uploadBatchToCollection(List documentBatch) throws IOException { if (documentBatch.size() == 0) { return; } @@ -300,6 +332,12 @@ public class UpdateStream extends TupleStream implements Expressible { try { cloudSolrClient.add(collection, documentBatch); } catch (SolrServerException | IOException e) { + // TODO: it would be nice if there was an option to "skipFailedBatches" + // TODO: and just record the batch failure info in the summary tuple for that batch and continue + // + // TODO: The summary batches (and/or stream error) should also pay attention to the error metadata + // from the SolrServerException ... and ideally also any TolerantUpdateProcessor metadata + log.warn("Unable to add documents to collection due to unexpected error.", e); String className = e.getClass().getName(); String message = e.getMessage(); diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml index 0e13a5a731f..e7e5c1ed773 100644 --- a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml +++ b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml @@ -35,6 +35,11 @@ + + + + + diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java index 025f6b23619..459626e27f2 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java @@ -45,7 +45,7 @@ import org.junit.Test; public class TestLang extends SolrTestCase { private static final String[] allFunctions = { - "search", "facet", "facet2D", "update", "jdbc", "topic", "commit", "random", "knnSearch", "merge", + "search", "facet", "facet2D", "update", "delete", "jdbc", "topic", "commit", "random", "knnSearch", "merge", "unique", "top", "group", "reduce", "parallel", "rollup", "stats", "innerJoin", "leftOuterJoin", "hashJoin", "outerHashJoin", "intersect", "complement", "sort", "train", "features", "daemon", "shortestPath", "gatherNodes", "nodes", diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java index b360278b2cb..cb6c2092689 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java @@ -246,7 +246,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, params("qt", "/stream", "expr", "update("+COLLECTION_X+",batchSize=1," + - "tuple(id='42',a_i=1,b_i=5))")); + "tuple(id=42,a_i=1,b_i=5))")); solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); final List tuples = getTuples(solrStream); assertEquals(1, tuples.size()); @@ -259,7 +259,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, params("qt", "/stream", "expr", "update("+COLLECTION_X+",batchSize=1," + - "tuple(id='42',a_i=1,b_i=5))")); + "tuple(id=42,a_i=1,b_i=5))")); // "WRITE" credentials should be required for 'update(...)' solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD"); @@ -278,7 +278,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, params("qt", "/stream", "expr", "update("+COLLECTION_X+",batchSize=1," + - "tuple(id='42',a_i=1,b_i=5))")); + "tuple(id=42,a_i=1,b_i=5))")); solrStream.setCredentials(user, user); @@ -296,7 +296,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, params("qt", "/stream", "expr", "update("+COLLECTION_X+",batchSize=1," + - "tuple(id='42',a_i=1,b_i=5))")); + "tuple(id=42,a_i=1,b_i=5))")); solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); final List tuples = getTuples(solrStream); assertEquals(1, tuples.size()); @@ -318,7 +318,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { + " search("+COLLECTION_Y+", " + " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch + " rows=100, " - + " fl=\"id,foo_i\", " + + " fl=\"id,foo_i,_version_\", " // pruneVersionField default true + " sort=\"foo_i desc\")) " ; @@ -370,7 +370,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final SolrStream solrStream = new SolrStream(solrUrl + "/" + path, params("qt", "/stream", "expr", "update("+COLLECTION_X+",batchSize=1," + - "tuple(id='42',a_i=1,b_i=5))")); + "tuple(id=42,a_i=1,b_i=5))")); solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER); // NOTE: Can't make any assertions about Exception: SOLR-14226 @@ -386,7 +386,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final String expr = "executor(threads=1, " + " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, " - + " tuple(id='42',a_i=1,b_i=5)) " + + " tuple(id=42,a_i=1,b_i=5)) " + " \")) " ; final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, @@ -554,7 +554,226 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { 0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); } + + public void testSimpleDeleteStream() throws Exception { + assertEquals(0, + (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER) + .add(sdoc("id", "42")) + .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus()); + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, + params("qt", "/stream", "expr", + "delete("+COLLECTION_X+",batchSize=1," + + "tuple(id=42))")); + solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); + final List tuples = getTuples(solrStream); + assertEquals(1, tuples.size()); + assertEquals(1L, tuples.get(0).get("totalIndexed")); + + assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + } + + /** A simple "Delete by Query" example */ + public void testSimpleDeleteStreamByQuery() throws Exception { + { // Put some "real" docs directly to both X... + final UpdateRequest update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER); + for (int i = 1; i <= 42; i++) { + update.add(sdoc("id",i+"x","foo_i",""+i)); + } + assertEquals("initial docs in X", + 0, update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus()); + } + + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + { // WRITE_X user should be able to delete X via a query from X + final String expr + = "delete("+COLLECTION_X+", batchSize=5, " // note batch size + + " search("+COLLECTION_X+", " + + " q=\"foo_i:[* TO 10]\", " // 10 matches = 2 batches + + " rows=100, " + + " fl=\"id,foo_i,_version_\", " // foo_i should be ignored... + + " sort=\"foo_i desc\")) " // version constraint should be ok + ; + + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, + params("qt", "/stream", + "expr", expr)); + solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); + final List tuples = getTuples(solrStream); + assertEquals(2, tuples.size()); + assertEquals(5L, tuples.get(0).get("totalIndexed")); + assertEquals(10L, tuples.get(1).get("totalIndexed")); + } + + assertEquals(42L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + } + + public void testSimpleDeleteStreamInvalidCredentials() throws Exception { + assertEquals(0, + (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER) + .add(sdoc("id", "42")) + .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus()); + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, + params("qt", "/stream", "expr", + "update("+COLLECTION_X+",batchSize=1," + + "tuple(id=42))")); + // "WRITE" credentials should be required for 'update(...)' + solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD"); + + // NOTE: Can't make any assertions about Exception: SOLR-14226 + expectThrows(Exception.class, () -> { + final List ignored = getTuples(solrStream); + }); + + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + } + + + public void testSimpleDeleteStreamInsufficientCredentials() throws Exception { + assertEquals(0, + (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER) + .add(sdoc("id", "42")) + .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus()); + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + // both of these users have valid credentials and authz read COLLECTION_X, but neither has + // authz to write to X... + for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) { + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, + params("qt", "/stream", "expr", + "update("+COLLECTION_X+",batchSize=1," + + "tuple(id=42))")); + + solrStream.setCredentials(user, user); + + // NOTE: Can't make any assertions about Exception: SOLR-14226 + expectThrows(Exception.class, () -> { + final List ignored = getTuples(solrStream); + }); + } + + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + } + + public void testIndirectDeleteStream() throws Exception { + { // Put some "real" docs directly to both X & Y... + final UpdateRequest xxx_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER); + final UpdateRequest yyy_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER); + for (int i = 1; i <= 42; i++) { + xxx_Update.add(sdoc("id",i+"z","foo_i",""+i)); + yyy_Update.add(sdoc("id",i+"z","foo_i",""+i)); + } + assertEquals("initial docs in X", + 0, xxx_Update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus()); + assertEquals("initial docs in Y", + 0, yyy_Update.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus()); + } + + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER)); + + { // WRITE_X user should be able to delete X via a (dummy) stream from Y... + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, + params("qt", "/stream", "expr", + "delete("+COLLECTION_X+",batchSize=1," + + "tuple(id=42z))")); + solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); + final List tuples = getTuples(solrStream); + assertEquals(1, tuples.size()); + assertEquals(1L, tuples.get(0).get("totalIndexed")); + } + + assertEquals(42L - 1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER)); + + { // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via Y) + final String expr + = "delete("+COLLECTION_X+", batchSize=50, " // note batch size + + " pruneVersionField=true, " // NOTE: ignoring Y version to del X + + " search("+COLLECTION_Y+", " + + " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch + + " rows=100, " + + " fl=\"id,foo_i,_version_\", " // foo_i & version should be ignored + + " sort=\"foo_i desc\")) " + ; + + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, // NOTE: Y route + params("qt", "/stream", + "expr", expr)); + solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); + final List tuples = getTuples(solrStream); + assertEquals(1, tuples.size()); + assertEquals(10L, tuples.get(0).get("batchIndexed")); + assertEquals(10L, tuples.get(0).get("totalIndexed")); + + } + + assertEquals(42L - 1L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER)); + + { // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via X)... + final String expr + = "delete("+COLLECTION_X+", batchSize=5, " // note batch size + + " search("+COLLECTION_Y+", " + + " q=\"foo_i:[30 TO *]\", " // 13 matches = 3 batches + + " rows=100, " + + " fl=\"id,foo_i\", " // foo_i should be ignored + + " sort=\"foo_i desc\")) " + ; + + final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, // NOTE: X route + params("qt", "/stream", + "expr", expr)); + solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER); + final List tuples = getTuples(solrStream); + assertEquals(3, tuples.size()); + + assertEquals( 5L, tuples.get(0).get("batchIndexed")); + assertEquals( 5L, tuples.get(0).get("totalIndexed")); + + assertEquals( 5L, tuples.get(1).get("batchIndexed")); + assertEquals(10L, tuples.get(1).get("totalIndexed")); + + assertEquals( 3L, tuples.get(2).get("batchIndexed")); + assertEquals(13L, tuples.get(2).get("totalIndexed")); + } + + assertEquals(42L - 1L - 10L - (13L - 1L), // '42' in last 13 deletes was already deleted from X + commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER)); + + } + + public void testIndirectDeleteStreamInsufficientCredentials() throws Exception { + assertEquals(0, + (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER) + .add(sdoc("id", "42")) + .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus()); + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + + // regardless of how it's routed, WRITE_Y should NOT have authz to delete from X... + for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) { + final SolrStream solrStream = new SolrStream(solrUrl + "/" + path, + params("qt", "/stream", "expr", + "delete("+COLLECTION_X+",batchSize=1," + + "tuple(id=42))")); + solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER); + + // NOTE: Can't make any assertions about Exception: SOLR-14226 + expectThrows(Exception.class, () -> { + final List ignored = getTuples(solrStream); + }); + } + + assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER)); + } + /** * Helper method that uses the specified user to (first commit, and then) count the total * number of documents in the collection diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 75474a4f768..52aa3780aa5 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; @@ -56,8 +57,10 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.AbstractDistribZkTestBase; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrDocument; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.Assume; @@ -2696,7 +2699,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase { try { //Copy all docs to destinationCollection - expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))"); + // confirm update() stream defaults to ignoring _version_ field in tuples + expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,_version_,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))"); stream = new UpdateStream(expression, factory); stream.setStreamContext(streamContext); List tuples = getTuples(stream); @@ -4267,6 +4271,155 @@ public class StreamDecoratorTest extends SolrCloudTestCase { } } + public void testDeleteStream() throws Exception { + final String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + final SolrClient client = cluster.getSolrClient(); + + { final UpdateRequest req = new UpdateRequest(); + for (int i = 0; i < 20; i++) { + req.add(id, "doc_"+i, "deletable_s", "yup"); + } + assertEquals(0, req.commit(cluster.getSolrClient(), COLLECTIONORALIAS).getStatus()); + } + + // fetch the _version_ param assigned each doc to test optimistic concurrency later... + final Map versions = new HashMap<>(); + { final QueryResponse allDocs = client.query(COLLECTIONORALIAS, params("q","deletable_s:yup", + "rows","100")); + assertEquals(20L, allDocs.getResults().getNumFound()); + for (SolrDocument doc : allDocs.getResults()) { + versions.put(doc.getFirstValue("id").toString(), (Long) doc.getFirstValue("_version_")); + } + } + + { // trivially delete 1 doc + final String expr + = "commit("+COLLECTIONORALIAS+",waitSearcher=true, " + + " delete("+COLLECTIONORALIAS+",batchSize=10, " + + " tuple(id=doc_2))) " + ; + final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr)); + + final List tuples = getTuples(stream); + assertEquals(1, tuples.size()); + assertEquals(1L, tuples.get(0).get("totalIndexed")); + + assertEquals(20L - 1L, + client.query(COLLECTIONORALIAS, + params("q","deletable_s:yup")).getResults().getNumFound()); + } + + { // delete 5 docs, spread across 3 batches (2 + 2 + 1) + final String expr + = "commit("+COLLECTIONORALIAS+",waitSearcher=true, " + + " delete("+COLLECTIONORALIAS+",batchSize=2,list( " // NOTE: batch size + + " tuple(id=doc_3), " + + " tuple(id=doc_11), " + + " tuple(id=doc_7), " + + " tuple(id=doc_17), " + + " tuple(id=doc_15), " + + " ) ) ) " + ; + final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr)); + + final List tuples = getTuples(stream); + assertEquals(3, tuples.size()); + assertEquals(2L, tuples.get(0).get("totalIndexed")); + assertEquals(4L, tuples.get(1).get("totalIndexed")); + assertEquals(5L, tuples.get(2).get("totalIndexed")); + + assertEquals(20L - 1L - 5L, + client.query(COLLECTIONORALIAS, + params("q","deletable_s:yup")).getResults().getNumFound()); + } + + { // attempt to delete 2 docs, one with correct version, one with "stale" version that should fail + // but config uses TolerantUpdateProcessorFactory so batch should still be ok... + // + // It would be nice it there was a more explicit, targetted, option for update() and delete() to + // ensure that even if one "batch" fails it continues with other batches. + // See TODO in UpdateStream + + final long v13_ok = versions.get("doc_13").longValue(); + final long v10_bad = versions.get("doc_10").longValue() - 42L; + final String expr + = "commit("+COLLECTIONORALIAS+",waitSearcher=true, " + + " delete("+COLLECTIONORALIAS+",batchSize=10,list( " + + " tuple(id=doc_10,_version_="+v10_bad+"), " + + " tuple(id=doc_13,_version_="+v13_ok+"), " + + " ) ) ) " + ; + final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr)); + + final List tuples = getTuples(stream); + assertEquals(1, tuples.size()); + assertEquals(2L, tuples.get(0).get("totalIndexed")); + + // should still be in the index due to version conflict... + assertEquals(1L, client.query(COLLECTIONORALIAS, + params("q","id:doc_10")).getResults().getNumFound()); + // should not be in the index due to successful delete... + assertEquals(0L, client.query(COLLECTIONORALIAS, + params("q","id:doc_13")).getResults().getNumFound()); + + assertEquals(20L - 1L - 5L - 1L, + client.query(COLLECTIONORALIAS, + params("q","deletable_s:yup")).getResults().getNumFound()); + } + + { // by using pruneVersionField=true we should be able to ignore optimistic concurrency constraints, + // and delete docs even if the stream we are wrapping returns _version_ values that are no + // longer valid... + final long v10_bad = versions.get("doc_10").longValue() - 42L; + final String expr + = "commit("+COLLECTIONORALIAS+",waitSearcher=true, " + + " delete("+COLLECTIONORALIAS+",batchSize=10, " + + " pruneVersionField=true, list( " + + " tuple(id=doc_10,_version_="+v10_bad+"), " + + " ) ) ) " + ; + final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr)); + + final List tuples = getTuples(stream); + assertEquals(1, tuples.size()); + assertEquals(1L, tuples.get(0).get("totalIndexed")); + + // _version_should have been ignored and doc deleted anyway... + assertEquals(0L, client.query(COLLECTIONORALIAS, + params("q","id:doc_10")).getResults().getNumFound()); + + assertEquals(20L - 1L - 5L - 1L - 1L, + client.query(COLLECTIONORALIAS, + params("q","deletable_s:yup")).getResults().getNumFound()); + } + + { // now test a "realistic" DBQ type situation, confirm all (remaining) matching docs deleted... + final String expr + = "commit("+COLLECTIONORALIAS+",waitSearcher=true, " + + " delete("+COLLECTIONORALIAS+",batchSize=99, " + + " search("+COLLECTIONORALIAS+",qt=\"/export\", " + + " q=\"deletable_s:yup\", " + + " sort=\"id asc\",fl=\"id,_version_\" " + + " ) ) ) " + ; + final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr)); + + final List tuples = getTuples(stream); + assertEquals(1, tuples.size()); + assertEquals(20L - 1L - 5L - 1L - 1L, + tuples.get(0).get("totalIndexed")); + + // shouldn't be anything left... + assertEquals(0L, + client.query(COLLECTIONORALIAS, + params("q","deletable_s:yup")).getResults().getNumFound()); + + } + + } + + + protected List getTuples(TupleStream tupleStream) throws IOException { List tuples = new ArrayList();