mirror of https://github.com/apache/lucene.git
SOLR-5890: Delete silently fails if not sent to shard where document was
added git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1658486 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5b517c34ac
commit
2eaad8e89b
|
@ -116,6 +116,9 @@ Bug Fixes
|
|||
* SOLR-6775: Creating backup snapshot results in null pointer exception.
|
||||
(Ryan Hesson, Varun Thacker via shalin)
|
||||
|
||||
* SOLR-5890: Delete silently fails if not sent to shard where document was
|
||||
added (Ishan Chattopadhyaya, Noble Paul)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-7049: Move work done by the LIST Collections API call to the Collections
|
||||
|
|
|
@ -113,7 +113,7 @@ public class SolrCloudPartitioner extends Partitioner<Text, SolrInputDocumentWri
|
|||
String keyStr = key.toString();
|
||||
|
||||
// TODO: scalability: replace linear search in HashBasedRouter.hashToSlice() with binary search on sorted hash ranges
|
||||
Slice slice = docRouter.getTargetSlice(keyStr, doc, emptySolrParams, docCollection);
|
||||
Slice slice = docRouter.getTargetSlice(keyStr, doc, null, emptySolrParams, docCollection);
|
||||
|
||||
// LOG.info("slice: {}", slice);
|
||||
if (slice == null) {
|
||||
|
|
|
@ -382,7 +382,7 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
|
||||
Map<String, List<String>> sliceToId = new HashMap<>();
|
||||
for (String id : allIds) {
|
||||
Slice slice = coll.getRouter().getTargetSlice(id, null, params, coll);
|
||||
Slice slice = coll.getRouter().getTargetSlice(id, null, null, params, coll);
|
||||
|
||||
List<String> idsForShard = sliceToId.get(slice.getName());
|
||||
if (idsForShard == null) {
|
||||
|
|
|
@ -139,6 +139,12 @@ public class JavabinLoader extends ContentStreamLoader {
|
|||
delcmd.setVersion(version);
|
||||
}
|
||||
}
|
||||
if (map != null) {
|
||||
String route = (String) map.get(UpdateRequest.ROUTE);
|
||||
if (route != null) {
|
||||
delcmd.setRoute(route);
|
||||
}
|
||||
}
|
||||
processor.processDelete(delcmd);
|
||||
delcmd.clear();
|
||||
}
|
||||
|
|
|
@ -371,6 +371,8 @@ public class JsonLoader extends ContentStreamLoader {
|
|||
cmd.commitWithin = (int)parser.getLong();
|
||||
} else if( "_version_".equals(key) ) {
|
||||
cmd.setVersion(parser.getLong());
|
||||
} else if ("_route_".equals(key)) {
|
||||
cmd.setRoute(parser.getString());
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown key: "+key+" ["+parser.getPosition()+"]" );
|
||||
}
|
||||
|
|
|
@ -17,7 +17,9 @@ package org.apache.solr.handler.loader;
|
|||
*/
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
|
@ -56,6 +58,7 @@ import javax.xml.transform.TransformerException;
|
|||
import javax.xml.transform.dom.DOMResult;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.sax.SAXSource;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -328,6 +331,9 @@ public class XMLLoader extends ContentStreamLoader {
|
|||
if (UpdateRequestHandler.VERSION.equals(attrName)) {
|
||||
deleteCmd.setVersion(Long.parseLong(attrVal));
|
||||
}
|
||||
if (UpdateRequest.ROUTE.equals(attrName)) {
|
||||
deleteCmd.setRoute(attrVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -108,6 +108,8 @@ public class DeleteUpdateCommand extends UpdateCommand {
|
|||
if (indexedId!=null) sb.append(",indexedId=").append(getId());
|
||||
if (query != null) sb.append(",query=`").append(query).append('`');
|
||||
sb.append(",commitWithin=").append(commitWithin);
|
||||
if (route != null)
|
||||
sb.append(",_route_=").append(route);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -176,8 +176,9 @@ public class SolrCmdDistributor {
|
|||
for (Node node : nodes) {
|
||||
UpdateRequest uReq = new UpdateRequest();
|
||||
uReq.setParams(params);
|
||||
uReq.setCommitWithin(cmd.commitWithin);
|
||||
if (cmd.isDeleteById()) {
|
||||
uReq.deleteById(cmd.getId(), cmd.getVersion());
|
||||
uReq.deleteById(cmd.getId(), cmd.getRoute(), cmd.getVersion());
|
||||
} else {
|
||||
uReq.deleteByQuery(cmd.query);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.solr.request.SolrQueryRequest;
|
|||
public abstract class UpdateCommand implements Cloneable {
|
||||
protected SolrQueryRequest req;
|
||||
protected long version;
|
||||
protected String route;
|
||||
protected int flags;
|
||||
|
||||
public static int BUFFERING = 0x00000001; // update command is being buffered.
|
||||
|
@ -65,6 +66,14 @@ public abstract class UpdateCommand implements Cloneable {
|
|||
this.version = version;
|
||||
}
|
||||
|
||||
public String getRoute() {
|
||||
return route;
|
||||
}
|
||||
|
||||
public void setRoute (String route) {
|
||||
this.route = route;
|
||||
}
|
||||
|
||||
public void setFlags(int flags) {
|
||||
this.flags = flags;
|
||||
}
|
||||
|
|
|
@ -309,8 +309,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private List<Node> setupRequest(String id, SolrInputDocument doc) {
|
||||
return setupRequest(id, doc, null);
|
||||
}
|
||||
|
||||
private List<Node> setupRequest(String id, SolrInputDocument doc, String route) {
|
||||
List<Node> nodes = null;
|
||||
|
||||
// if we are in zk mode...
|
||||
|
@ -324,7 +327,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
ClusterState cstate = zkController.getClusterState();
|
||||
DocCollection coll = cstate.getCollection(collection);
|
||||
Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
|
||||
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
|
||||
|
||||
if (slice == null) {
|
||||
// No slice found. Most strict routers will have already thrown an exception, so a null return is
|
||||
|
@ -1136,7 +1139,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
if (zkEnabled) {
|
||||
zkCheck();
|
||||
nodes = setupRequest(cmd.getId(), null);
|
||||
nodes = setupRequest(cmd.getId(), null, cmd.getRoute());
|
||||
} else {
|
||||
isLeader = getNonZkLeaderAssumption(req);
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.solr.client.solrj.SolrQuery;
|
|||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -35,6 +37,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
|||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.update.VersionInfo;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
|
@ -44,6 +47,7 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Super basic testing, no shard restarting or anything.
|
||||
|
@ -136,13 +140,269 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase
|
|||
// TODO: testOptimisticUpdate(results);
|
||||
|
||||
testDeleteByQueryDistrib();
|
||||
|
||||
|
||||
testDeleteByIdImplicitRouter();
|
||||
|
||||
testDeleteByIdCompositeRouterWithRouterField();
|
||||
|
||||
docId = testThatCantForwardToLeaderFails(docId);
|
||||
|
||||
|
||||
|
||||
|
||||
docId = testIndexingBatchPerRequestWithHttpSolrClient(docId);
|
||||
}
|
||||
|
||||
private void testDeleteByIdImplicitRouter() throws Exception {
|
||||
SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
CollectionAdminResponse response;
|
||||
Map<String, NamedList<Integer>> coresStatus;
|
||||
|
||||
CollectionAdminRequest.Create createCollectionRequest = new CollectionAdminRequest.Create();
|
||||
createCollectionRequest.setCollectionName("implicit_collection_without_routerfield");
|
||||
createCollectionRequest.setRouterName("implicit");
|
||||
createCollectionRequest.setNumShards(2);
|
||||
createCollectionRequest.setShards("shard1,shard2");
|
||||
createCollectionRequest.setReplicationFactor(2);
|
||||
createCollectionRequest.setConfigName("conf1");
|
||||
response = createCollectionRequest.process(server);
|
||||
|
||||
assertEquals(0, response.getStatus());
|
||||
assertTrue(response.isSuccess());
|
||||
coresStatus = response.getCollectionCoresStatus();
|
||||
assertEquals(4, coresStatus.size());
|
||||
for (int i = 0; i < 4; i++) {
|
||||
NamedList<Integer> status = coresStatus.get("implicit_collection_without_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
|
||||
assertEquals(0, (int) status.get("status"));
|
||||
assertTrue(status.get("QTime") > 0);
|
||||
}
|
||||
|
||||
SolrClient shard1 = createNewSolrClient("implicit_collection_without_routerfield_shard1_replica1",
|
||||
getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
SolrClient shard2 = createNewSolrClient("implicit_collection_without_routerfield_shard2_replica1",
|
||||
getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
int docCounts1, docCounts2;
|
||||
|
||||
// Add three documents to shard1
|
||||
doc.clear();
|
||||
doc.addField("id", "1");
|
||||
doc.addField("title", "s1 one");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "2");
|
||||
doc.addField("title", "s1 two");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "3");
|
||||
doc.addField("title", "s1 three");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
docCounts1 = 3; // Three documents in shard1
|
||||
|
||||
// Add two documents to shard2
|
||||
doc.clear();
|
||||
doc.addField("id", "4");
|
||||
doc.addField("title", "s2 four");
|
||||
shard2.add(doc);
|
||||
shard2.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "5");
|
||||
doc.addField("title", "s2 five");
|
||||
shard2.add(doc);
|
||||
shard2.commit();
|
||||
|
||||
docCounts2 = 2; // Two documents in shard2
|
||||
|
||||
// Verify the documents were added to correct shards
|
||||
ModifiableSolrParams query = new ModifiableSolrParams();
|
||||
query.set("q", "*:*");
|
||||
QueryResponse respAll = shard1.query(query);
|
||||
assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
|
||||
|
||||
query.set("shards", "shard1");
|
||||
QueryResponse resp1 = shard1.query(query);
|
||||
assertEquals(docCounts1, resp1.getResults().getNumFound());
|
||||
|
||||
query.set("shards", "shard2");
|
||||
QueryResponse resp2 = shard2.query(query);
|
||||
assertEquals(docCounts2, resp2.getResults().getNumFound());
|
||||
|
||||
|
||||
// Delete a document in shard2 with update to shard1, with _route_ param
|
||||
// Should delete.
|
||||
UpdateRequest deleteRequest = new UpdateRequest();
|
||||
deleteRequest.deleteById("4", "shard2");
|
||||
shard1.request(deleteRequest);
|
||||
shard1.commit();
|
||||
query.set("shards", "shard2");
|
||||
resp2 = shard2.query(query);
|
||||
assertEquals(--docCounts2, resp2.getResults().getNumFound());
|
||||
|
||||
// Delete a document in shard2 with update to shard1, without _route_ param
|
||||
// Shouldn't delete, since deleteById requests are not broadcast to all shard leaders.
|
||||
deleteRequest = new UpdateRequest();
|
||||
deleteRequest.deleteById("5");
|
||||
shard1.request(deleteRequest);
|
||||
shard1.commit();
|
||||
query.set("shards", "shard2");
|
||||
resp2 = shard2.query(query);
|
||||
assertEquals(docCounts2, resp2.getResults().getNumFound());
|
||||
|
||||
// Multiple deleteById commands in a single request
|
||||
deleteRequest.clear();
|
||||
deleteRequest.deleteById("2", "shard1");
|
||||
deleteRequest.deleteById("3", "shard1");
|
||||
deleteRequest.setCommitWithin(1);
|
||||
query.set("shards", "shard1");
|
||||
shard2.request(deleteRequest);
|
||||
resp1 = shard1.query(query);
|
||||
--docCounts1;
|
||||
--docCounts1;
|
||||
assertEquals(docCounts1, resp1.getResults().getNumFound());
|
||||
|
||||
// Test commitWithin, update to shard2, document deleted in shard1
|
||||
deleteRequest.clear();
|
||||
deleteRequest.deleteById("1", "shard1");
|
||||
deleteRequest.setCommitWithin(1);
|
||||
shard2.request(deleteRequest);
|
||||
Thread.sleep(1000);
|
||||
query.set("shards", "shard1");
|
||||
resp1 = shard1.query(query);
|
||||
assertEquals(--docCounts1, resp1.getResults().getNumFound());
|
||||
}
|
||||
|
||||
private void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
|
||||
SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
CollectionAdminResponse response;
|
||||
Map<String, NamedList<Integer>> coresStatus;
|
||||
|
||||
CollectionAdminRequest.Create createCollectionRequest = new CollectionAdminRequest.Create();
|
||||
createCollectionRequest.setCollectionName("compositeid_collection_with_routerfield");
|
||||
createCollectionRequest.setRouterName("compositeId");
|
||||
createCollectionRequest.setRouterField("routefield_s");
|
||||
createCollectionRequest.setNumShards(2);
|
||||
createCollectionRequest.setShards("shard1,shard2");
|
||||
createCollectionRequest.setReplicationFactor(2);
|
||||
createCollectionRequest.setConfigName("conf1");
|
||||
response = createCollectionRequest.process(server);
|
||||
|
||||
assertEquals(0, response.getStatus());
|
||||
assertTrue(response.isSuccess());
|
||||
coresStatus = response.getCollectionCoresStatus();
|
||||
assertEquals(4, coresStatus.size());
|
||||
for (int i = 0; i < 4; i++) {
|
||||
NamedList<Integer> status = coresStatus.get("compositeid_collection_with_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
|
||||
assertEquals(0, (int) status.get("status"));
|
||||
assertTrue(status.get("QTime") > 0);
|
||||
}
|
||||
|
||||
SolrClient shard1 = createNewSolrClient("compositeid_collection_with_routerfield_shard1_replica1",
|
||||
getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
SolrClient shard2 = createNewSolrClient("compositeid_collection_with_routerfield_shard2_replica1",
|
||||
getBaseUrl((HttpSolrClient) clients.get(0)));
|
||||
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
int docCounts1 = 0, docCounts2 = 0;
|
||||
|
||||
// Add three documents to shard1
|
||||
doc.clear();
|
||||
doc.addField("id", "1");
|
||||
doc.addField("title", "s1 one");
|
||||
doc.addField("routefield_s", "europe");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "2");
|
||||
doc.addField("title", "s1 two");
|
||||
doc.addField("routefield_s", "europe");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "3");
|
||||
doc.addField("title", "s1 three");
|
||||
doc.addField("routefield_s", "europe");
|
||||
shard1.add(doc);
|
||||
shard1.commit();
|
||||
|
||||
docCounts1 = 3; // Three documents in shard1
|
||||
|
||||
// Add two documents to shard2
|
||||
doc.clear();
|
||||
doc.addField("id", "4");
|
||||
doc.addField("title", "s2 four");
|
||||
doc.addField("routefield_s", "africa");
|
||||
shard2.add(doc);
|
||||
//shard2.commit();
|
||||
|
||||
doc.clear();
|
||||
doc.addField("id", "5");
|
||||
doc.addField("title", "s2 five");
|
||||
doc.addField("routefield_s", "africa");
|
||||
shard2.add(doc);
|
||||
shard2.commit();
|
||||
|
||||
docCounts2 = 2; // Two documents in shard2
|
||||
|
||||
// Verify the documents were added to correct shards
|
||||
ModifiableSolrParams query = new ModifiableSolrParams();
|
||||
query.set("q", "*:*");
|
||||
QueryResponse respAll = shard1.query(query);
|
||||
assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
|
||||
|
||||
query.set("shards", "shard1");
|
||||
QueryResponse resp1 = shard1.query(query);
|
||||
assertEquals(docCounts1, resp1.getResults().getNumFound());
|
||||
|
||||
query.set("shards", "shard2");
|
||||
QueryResponse resp2 = shard2.query(query);
|
||||
assertEquals(docCounts2, resp2.getResults().getNumFound());
|
||||
|
||||
// Delete a document in shard2 with update to shard1, with _route_ param
|
||||
// Should delete.
|
||||
UpdateRequest deleteRequest = new UpdateRequest();
|
||||
deleteRequest.deleteById("4", "africa");
|
||||
deleteRequest.setCommitWithin(1);
|
||||
shard1.request(deleteRequest);
|
||||
shard1.commit();
|
||||
|
||||
query.set("shards", "shard2");
|
||||
resp2 = shard2.query(query);
|
||||
--docCounts2;
|
||||
assertEquals(docCounts2, resp2.getResults().getNumFound());
|
||||
|
||||
// Multiple deleteById commands in a single request
|
||||
deleteRequest.clear();
|
||||
deleteRequest.deleteById("2", "europe");
|
||||
deleteRequest.deleteById("3", "europe");
|
||||
deleteRequest.setCommitWithin(1);
|
||||
query.set("shards", "shard1");
|
||||
shard1.request(deleteRequest);
|
||||
shard1.commit();
|
||||
Thread.sleep(1000);
|
||||
resp1 = shard1.query(query);
|
||||
--docCounts1;
|
||||
--docCounts1;
|
||||
assertEquals(docCounts1, resp1.getResults().getNumFound());
|
||||
|
||||
// Test commitWithin, update to shard2, document deleted in shard1
|
||||
deleteRequest.clear();
|
||||
deleteRequest.deleteById("1", "europe");
|
||||
deleteRequest.setCommitWithin(1);
|
||||
shard2.request(deleteRequest);
|
||||
query.set("shards", "shard1");
|
||||
resp1 = shard1.query(query);
|
||||
--docCounts1;
|
||||
assertEquals(docCounts1, resp1.getResults().getNumFound());
|
||||
}
|
||||
|
||||
private long testThatCantForwardToLeaderFails(long docId) throws Exception {
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
ZkNodeProps props = zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard1");
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestHashPartitioner extends SolrTestCaseJ4 {
|
|||
|
||||
public void doIndex(DocCollection coll, String id, String expectedShard) {
|
||||
DocRouter router = coll.getRouter();
|
||||
Slice target = router.getTargetSlice(id, null, null, coll);
|
||||
Slice target = router.getTargetSlice(id, null, null, null, coll);
|
||||
assertEquals(expectedShard, target.getName());
|
||||
}
|
||||
|
||||
|
@ -206,7 +206,7 @@ public class TestHashPartitioner extends SolrTestCaseJ4 {
|
|||
"A!B/-5", "!/130!", "!!A/1000", "A//8!B///10!C////" };
|
||||
for (int i = 0 ; i < ids.length ; ++i) {
|
||||
try {
|
||||
Slice targetSlice = coll.getRouter().getTargetSlice(ids[i], null, null, coll);
|
||||
Slice targetSlice = coll.getRouter().getTargetSlice(ids[i], null, null, null, coll);
|
||||
assertNotNull(targetSlice);
|
||||
} catch (Exception e) {
|
||||
throw new Exception("Exception routing id '" + ids[i] + "'", e);
|
||||
|
@ -238,7 +238,7 @@ public class TestHashPartitioner extends SolrTestCaseJ4 {
|
|||
}
|
||||
String id = idBuilder.toString();
|
||||
try {
|
||||
Slice targetSlice = router.getTargetSlice(id, null, null, coll);
|
||||
Slice targetSlice = router.getTargetSlice(id, null, null, null, coll);
|
||||
assertNotNull(targetSlice);
|
||||
} catch (Exception e) {
|
||||
throw new Exception("Exception routing id '" + id + "'", e);
|
||||
|
|
|
@ -589,6 +589,7 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
|
|||
+"\n ,'delete':['30','40']"
|
||||
+"\n ,'delete':{'id':50, '_version_':12345}"
|
||||
+"\n ,'delete':[{'id':60, '_version_':67890}, {'id':70, '_version_':77777}, {'query':'id:80', '_version_':88888}]"
|
||||
+"\n ,'delete':{'id':90, '_route_':'shard1', '_version_':88888}"
|
||||
+ "\n}\n";
|
||||
str = str.replace('\'', '"');
|
||||
SolrQueryRequest req = req();
|
||||
|
@ -598,7 +599,7 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
|
|||
loader.load(req, rsp, new ContentStreamBase.StringStream(str), p);
|
||||
|
||||
// DELETE COMMANDS
|
||||
assertEquals( 8, p.deleteCommands.size() );
|
||||
assertEquals( 9, p.deleteCommands.size() );
|
||||
DeleteUpdateCommand delete = p.deleteCommands.get( 0 );
|
||||
assertEquals( delete.id, "10" );
|
||||
assertEquals( delete.query, null );
|
||||
|
@ -637,7 +638,13 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
|
|||
delete = p.deleteCommands.get( 7 );
|
||||
assertEquals( delete.id, null );
|
||||
assertEquals( delete.query, "id:80" );
|
||||
assertEquals( delete.getVersion(), 88888L);
|
||||
assertEquals(delete.getVersion(), 88888L);
|
||||
|
||||
delete = p.deleteCommands.get(8);
|
||||
assertEquals(delete.id, "90");
|
||||
assertEquals(delete.query, null);
|
||||
assertEquals(delete.getRoute(), "shard1");
|
||||
assertEquals(delete.getVersion(), 88888L);
|
||||
|
||||
req.close();
|
||||
}
|
||||
|
|
|
@ -173,15 +173,27 @@ public class XmlUpdateRequestHandlerTest extends SolrTestCaseJ4 {
|
|||
" <delete>" +
|
||||
" <id>150</id>" +
|
||||
" </delete>" +
|
||||
" <delete>" +
|
||||
" <id version=\"42\">300</id>" +
|
||||
" </delete>" +
|
||||
" <delete>" +
|
||||
" <id _route_=\"shard1\">400</id>" +
|
||||
" </delete>" +
|
||||
" <delete>" +
|
||||
" <id _route_=\"shard1\" version=\"42\">500</id>" +
|
||||
" </delete>" +
|
||||
"</update>";
|
||||
|
||||
MockUpdateRequestProcessor p = new MockUpdateRequestProcessor(null);
|
||||
p.expectDelete(null, "id:150", -1);
|
||||
p.expectDelete("150", null, -1);
|
||||
p.expectDelete("200", null, -1);
|
||||
p.expectDelete(null, "id:200", -1);
|
||||
p.expectDelete(null, "id:150", 500);
|
||||
p.expectDelete("150", null, -1);
|
||||
p.expectDelete(null, "id:150", -1, 0, null);
|
||||
p.expectDelete("150", null, -1, 0, null);
|
||||
p.expectDelete("200", null, -1, 0, null);
|
||||
p.expectDelete(null, "id:200", -1, 0, null);
|
||||
p.expectDelete(null, "id:150", 500, 0, null);
|
||||
p.expectDelete("150", null, -1, 0, null);
|
||||
p.expectDelete("300", null, -1, 42, null);
|
||||
p.expectDelete("400", null, -1, 0, "shard1");
|
||||
p.expectDelete("500", null, -1, 42, "shard1");
|
||||
|
||||
XMLLoader loader = new XMLLoader().init(null);
|
||||
loader.load(req(), new SolrQueryResponse(), new ContentStreamBase.StringStream(xml), p);
|
||||
|
@ -197,11 +209,15 @@ public class XmlUpdateRequestHandlerTest extends SolrTestCaseJ4 {
|
|||
super(next);
|
||||
}
|
||||
|
||||
public void expectDelete(String id, String query, int commitWithin) {
|
||||
public void expectDelete(String id, String query, int commitWithin, long version, String route) {
|
||||
DeleteUpdateCommand cmd = new DeleteUpdateCommand(null);
|
||||
cmd.id = id;
|
||||
cmd.query = query;
|
||||
cmd.commitWithin = commitWithin;
|
||||
if (version!=0)
|
||||
cmd.setVersion(version);
|
||||
if (route!=null)
|
||||
cmd.setRoute(route);
|
||||
deleteCommands.add(cmd);
|
||||
}
|
||||
|
||||
|
@ -216,7 +232,8 @@ public class XmlUpdateRequestHandlerTest extends SolrTestCaseJ4 {
|
|||
assertTrue("Expected [" + expected + "] but found [" + cmd + "]",
|
||||
ObjectUtils.equals(expected.id, cmd.id) &&
|
||||
ObjectUtils.equals(expected.query, cmd.query) &&
|
||||
expected.commitWithin==cmd.commitWithin);
|
||||
expected.commitWithin==cmd.commitWithin &&
|
||||
ObjectUtils.equals(expected.getRoute(), cmd.getRoute()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -206,6 +206,9 @@ public class JavaBinUpdateRequestCodec {
|
|||
Map<String,Object> params = entry.getValue();
|
||||
if (params != null) {
|
||||
Long version = (Long) params.get(UpdateRequest.VER);
|
||||
if (params.containsKey(UpdateRequest.ROUTE))
|
||||
updateRequest.deleteById(entry.getKey(), (String) params.get(UpdateRequest.ROUTE));
|
||||
else
|
||||
updateRequest.deleteById(entry.getKey(), version);
|
||||
} else {
|
||||
updateRequest.deleteById(entry.getKey());
|
||||
|
|
|
@ -49,6 +49,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
public static final String REPFACT = "rf";
|
||||
public static final String MIN_REPFACT = "min_rf";
|
||||
public static final String VER = "ver";
|
||||
public static final String ROUTE = "_route_";
|
||||
public static final String OVERWRITE = "ow";
|
||||
public static final String COMMIT_WITHIN = "cw";
|
||||
private Map<SolrInputDocument,Map<String,Object>> documents = null;
|
||||
|
@ -132,7 +133,25 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
deleteById.put(id, null);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public UpdateRequest deleteById(String id, String route) {
|
||||
return deleteById(id, route, null);
|
||||
}
|
||||
|
||||
public UpdateRequest deleteById(String id, String route, Long version) {
|
||||
if (deleteById == null) {
|
||||
deleteById = new LinkedHashMap<>();
|
||||
}
|
||||
Map<String, Object> params = (route == null && version == null) ? null : new HashMap<>(1);
|
||||
if (version != null)
|
||||
params.put(VER, version);
|
||||
if (route != null)
|
||||
params.put(ROUTE, route);
|
||||
deleteById.put(id, params);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public UpdateRequest deleteById(List<String> ids) {
|
||||
if (deleteById == null) {
|
||||
deleteById = new LinkedHashMap<>();
|
||||
|
@ -146,13 +165,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
}
|
||||
|
||||
public UpdateRequest deleteById(String id, Long version) {
|
||||
if (deleteById == null) {
|
||||
deleteById = new LinkedHashMap<>();
|
||||
}
|
||||
Map<String,Object> params = new HashMap<>(1);
|
||||
params.put(VER, version);
|
||||
deleteById.put(id, params);
|
||||
return this;
|
||||
return deleteById(id, null, version);
|
||||
}
|
||||
|
||||
public UpdateRequest deleteByQuery(String q) {
|
||||
|
@ -190,7 +203,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return null;
|
||||
}
|
||||
Slice slice = router.getTargetSlice(id
|
||||
.toString(), doc, null, col);
|
||||
.toString(), doc, null, null, col);
|
||||
if (slice == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -237,7 +250,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
if (map != null) {
|
||||
version = (Long) map.get(VER);
|
||||
}
|
||||
Slice slice = router.getTargetSlice(deleteId, null, null, col);
|
||||
Slice slice = router.getTargetSlice(deleteId, null, null, null, col);
|
||||
if (slice == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -384,9 +397,14 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
Map<String,Object> map = entry.getValue();
|
||||
if (map != null) {
|
||||
Long version = (Long) map.get(VER);
|
||||
String route = (String)map.get(ROUTE);
|
||||
if (version != null) {
|
||||
writer.append(" version=\"" + version + "\"");
|
||||
}
|
||||
|
||||
if (route != null) {
|
||||
writer.append(" _route_=\"" + route + "\"");
|
||||
}
|
||||
}
|
||||
writer.append(">");
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ public abstract class DocRouter {
|
|||
}
|
||||
|
||||
/** Returns the Slice that the document should reside on, or null if there is not enough information */
|
||||
public abstract Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection);
|
||||
public abstract Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection);
|
||||
|
||||
/** This method is consulted to determine what slices should be queried for a request when
|
||||
* an explicit shards parameter was not used.
|
||||
|
|
|
@ -28,9 +28,14 @@ import java.util.Collections;
|
|||
public abstract class HashBasedRouter extends DocRouter {
|
||||
|
||||
@Override
|
||||
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||
if (id == null) id = getId(sdoc, params);
|
||||
int hash = sliceHash(id, sdoc, params,collection);
|
||||
public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) {
|
||||
int hash;
|
||||
if (route != null) {
|
||||
hash = sliceHash(route, sdoc, params, collection);
|
||||
} else {
|
||||
if (id == null) id = getId(sdoc, params);
|
||||
hash = sliceHash(id, sdoc, params, collection);
|
||||
}
|
||||
return hashToSlice(hash, collection);
|
||||
}
|
||||
|
||||
|
@ -70,7 +75,7 @@ public abstract class HashBasedRouter extends DocRouter {
|
|||
}
|
||||
|
||||
// use the shardKey as an id for plain hashing
|
||||
Slice slice = getTargetSlice(shardKey, null, params, collection);
|
||||
Slice slice = getTargetSlice(shardKey, null, null, params, collection);
|
||||
return slice == null ? Collections.<Slice>emptyList() : Collections.singletonList(slice);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,9 +38,12 @@ public class ImplicitDocRouter extends DocRouter {
|
|||
.getLogger(ImplicitDocRouter.class);
|
||||
|
||||
@Override
|
||||
public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
|
||||
public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) {
|
||||
String shard = null;
|
||||
if (sdoc != null) {
|
||||
|
||||
if (route != null) // if a route is already passed in, try to use it
|
||||
shard = route;
|
||||
else if (sdoc != null) {
|
||||
String f = getRouteField(collection);
|
||||
if(f !=null) {
|
||||
Object o = sdoc.getFieldValue(f);
|
||||
|
|
Loading…
Reference in New Issue