diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 55f794f88ff..e7ac7aec302 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -226,6 +226,9 @@ Bug Fixes * SOLR-4853: Fixed SolrJettyTestBase so it may be reused by end users (hossman) +* SOLR-4744: Update failure on sub shard is not propagated to clients by parent + shard (Anshum Gupta, yonik, shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 6bec6d43c77..766755c11eb 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -114,11 +114,11 @@ public class SolrCmdDistributor { // make sure any pending deletes are flushed flushDeletes(1); - + // TODO: this is brittle // need to make a clone since these commands may be reused AddUpdateCommand clone = new AddUpdateCommand(null); - + clone.solrDoc = cmd.solrDoc; clone.commitWithin = cmd.commitWithin; clone.overwrite = cmd.overwrite; @@ -135,10 +135,79 @@ public class SolrCmdDistributor { } alist.add(addRequest); } - + flushAdds(maxBufferedAddsPerServer); } - + + /** + * Synchronous (blocking) add to specified node. Any error returned from node is propagated. + */ + public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException { + log.info("SYNCADD on {} : {}", node, cmd.getPrintableId()); + checkResponses(false); + // flush all pending deletes + flushDeletes(1); + // flush all pending adds + flushAdds(1); + // finish with the pending requests + checkResponses(false); + + UpdateRequestExt ureq = new UpdateRequestExt(); + ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); + ureq.setParams(params); + syncRequest(node, ureq); + } + + public void syncDelete(DeleteUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { + log.info("SYNCDELETE on {} : ", nodes, cmd); + checkResponses(false); + // flush all pending adds + flushAdds(1); + // flush all pending deletes + flushDeletes(1); + // finish pending requests + checkResponses(false); + + DeleteUpdateCommand clonedCmd = clone(cmd); + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.cmd = clonedCmd; + deleteRequest.params = params; + + UpdateRequestExt ureq = new UpdateRequestExt(); + if (cmd.isDeleteById()) { + ureq.deleteById(cmd.getId(), cmd.getVersion()); + } else { + ureq.deleteByQuery(cmd.query); + } + ureq.setParams(params); + for (Node node : nodes) { + syncRequest(node, ureq); + } + } + + private void syncRequest(Node node, UpdateRequestExt ureq) { + Request sreq = new Request(); + sreq.node = node; + sreq.ureq = ureq; + + String url = node.getUrl(); + String fullUrl; + if (!url.startsWith("http://") && !url.startsWith("https://")) { + fullUrl = "http://" + url; + } else { + fullUrl = url; + } + + HttpSolrServer server = new HttpSolrServer(fullUrl, + updateShardHandler.getHttpClient()); + + try { + sreq.ursp = server.request(ureq); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node, sreq.exception); + } + } + public void distribCommit(CommitUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index bccc97eb134..c2b98009640 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -243,7 +243,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { .getReplicaProps(collection, shardId, coreNodeName, coreName, null, ZkStateReader.DOWN); - nodes = addSubShardLeaders(coll, shardId, id, doc, nodes); if (replicaProps != null) { if (nodes == null) { nodes = new ArrayList(replicaProps.size()); @@ -287,8 +286,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return nodes; } - private List addSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc, List nodes) { + private List getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) { Collection allSlices = coll.getSlices(); + List nodes = null; for (Slice aslice : allSlices) { if (Slice.CONSTRUCTION.equals(aslice.getState())) { DocRouter.Range myRange = coll.getSlice(shardId).getRange(); @@ -373,8 +373,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { nodes.add(new StdNode(props)); } } - - nodes = addSubShardLeaders(zkController.getClusterState().getCollection(collection), shardId, null, null, nodes); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", @@ -395,7 +393,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } else { isLeader = getNonZkLeaderAssumption(req); } - + boolean dropCmd = false; if (!forwardToLeader) { dropCmd = versionAdd(cmd); @@ -405,22 +403,35 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO: do we need to add anything to the response? return; } - + + if (zkEnabled && isLeader) { + DocCollection coll = zkController.getClusterState().getCollection(collection); + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); + // the list will actually have only one element for an add request + if (subShardLeaders != null && !subShardLeaders.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + for (Node subShardLeader : subShardLeaders) { + cmdDistrib.syncAdd(cmd, subShardLeader, params); + } + } + } + ModifiableSolrParams params = null; if (nodes != null) { - + params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, - (isLeader ? - DistribPhase.FROMLEADER.toString() : + params.set(DISTRIB_UPDATE_PARAM, + (isLeader ? + DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); if (isLeader) { params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); } - if (forwardToSubShard) { - params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); - } params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); @@ -640,7 +651,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (willDistrib) { clonedDoc = cmd.solrDoc.deepCopy(); } - + // TODO: possibly set checkDeleteByQueries as a flag on the command? doLocalAdd(cmd); @@ -781,13 +792,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } + if (zkEnabled && isLeader) { + DocCollection coll = zkController.getClusterState().getCollection(collection); + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null); + // the list will actually have only one element for an add request + if (subShardLeaders != null && !subShardLeaders.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set("distrib.from.parent", cloudDesc.getShardId()); + cmdDistrib.syncDelete(cmd, subShardLeaders, params); + } + } + + ModifiableSolrParams params = null; if (nodes != null) { - + params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, - (isLeader ? - DistribPhase.FROMLEADER.toString() : + params.set(DISTRIB_UPDATE_PARAM, + (isLeader ? + DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); if (isLeader) { params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( @@ -843,13 +869,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + DocCollection coll = zkController.getClusterState().getCollection(collection); + if (zkEnabled && DistribPhase.NONE == phase) { boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); - DocCollection coll = zkController.getClusterState().getCollection(collection); SolrParams params = req.getParams(); Collection slices = coll.getRouter().getSearchSlices(params.get(ShardParams.SHARD_KEYS), params, coll); @@ -957,16 +984,22 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { vinfo.unblockUpdates(); } + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null); // forward to all replicas - if (leaderLogic && replicas != null) { + if (leaderLogic) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set("update.from", ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - cmdDistrib.distribDelete(cmd, replicas, params); - cmdDistrib.finish(); + if (subShardLeaders != null) { + cmdDistrib.syncDelete(cmd, subShardLeaders, params); + } + if (replicas != null) { + cmdDistrib.distribDelete(cmd, replicas, params); + cmdDistrib.finish(); + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java index 1db6ddd144d..d551ba70364 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java @@ -78,7 +78,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { try { del("*:*"); for (int id = 0; id < 100; id++) { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id)); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); } commit(); @@ -88,7 +88,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest { int max = atLeast(401); for (int id = 101; id < max; id++) { try { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id)); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); Thread.sleep(atLeast(25)); } catch (Exception e) { log.error("Exception while adding doc", e); diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java index a999ce6391d..f79b0fe32e8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java @@ -28,10 +28,8 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.HashBasedRouter; -import org.apache.solr.common.cloud.PlainIdRouter; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -45,9 +43,13 @@ import org.junit.Before; import java.io.IOException; import java.net.MalformedURLException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; public class ShardSplitTest extends BasicDistributedZkTest { @@ -110,35 +112,53 @@ public class ShardSplitTest extends BasicDistributedZkTest { del("*:*"); for (int id = 0; id <= 100; id++) { String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution - indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id)); + indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id), id); } commit(); Thread indexThread = new Thread() { @Override public void run() { - int max = atLeast(401); + Random random = random(); + int max = atLeast(random, 401); + int sleep = atLeast(random, 25); + log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms"); + Set deleted = new HashSet(); for (int id = 101; id < max; id++) { try { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id)); - Thread.sleep(atLeast(25)); + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id); + Thread.sleep(sleep); + if (usually(random)) { + String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101); + if (deleted.contains(delId)) continue; + try { + deleteAndUpdateCount(router, ranges, docCounts, delId); + deleted.add(delId); + } catch (Exception e) { + log.error("Exception while deleting docs", e); + } + } } catch (Exception e) { - log.error("Exception while adding doc", e); + log.error("Exception while adding docs", e); } } } }; indexThread.start(); - splitShard(SHARD1); - - log.info("Layout after split: \n"); - printLayout(); - - indexThread.join(); + try { + splitShard(SHARD1); + log.info("Layout after split: \n"); + printLayout(); + } finally { + try { + indexThread.join(); + } catch (InterruptedException e) { + log.error("Indexing thread interrupted", e); + } + } commit(); - checkDocCountsAndShardStates(docCounts, numReplicas); // todo can't call waitForThingsToLevelOut because it looks for jettys of all shards @@ -148,24 +168,6 @@ public class ShardSplitTest extends BasicDistributedZkTest { } protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws SolrServerException, KeeperException, InterruptedException { - SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_"); - query.set("distrib", false); - - ZkCoreNodeProps shard1_0 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_0); - HttpSolrServer shard1_0Server = new HttpSolrServer(shard1_0.getCoreUrl()); - QueryResponse response = shard1_0Server.query(query); - long shard10Count = response.getResults().getNumFound(); - - ZkCoreNodeProps shard1_1 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_1); - HttpSolrServer shard1_1Server = new HttpSolrServer(shard1_1.getCoreUrl()); - QueryResponse response2 = shard1_1Server.query(query); - long shard11Count = response2.getResults().getNumFound(); - - logDebugHelp(docCounts, response, shard10Count, response2, shard11Count); - - assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count); - assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count); - ClusterState clusterState = null; Slice slice1_0 = null, slice1_1 = null; int i = 0; @@ -188,6 +190,24 @@ public class ShardSplitTest extends BasicDistributedZkTest { assertEquals("shard1_1 is not active", Slice.ACTIVE, slice1_1.getState()); assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size()); assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size()); + + SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_"); + query.set("distrib", false); + + ZkCoreNodeProps shard1_0 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_0); + HttpSolrServer shard1_0Server = new HttpSolrServer(shard1_0.getCoreUrl()); + QueryResponse response = shard1_0Server.query(query); + long shard10Count = response.getResults().getNumFound(); + + ZkCoreNodeProps shard1_1 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_1); + HttpSolrServer shard1_1Server = new HttpSolrServer(shard1_1.getCoreUrl()); + QueryResponse response2 = shard1_1Server.query(query); + long shard11Count = response2.getResults().getNumFound(); + + logDebugHelp(docCounts, response, shard10Count, response2, shard11Count); + + assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count); + assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count); } protected void splitShard(String shardId) throws SolrServerException, IOException { @@ -208,9 +228,26 @@ public class ShardSplitTest extends BasicDistributedZkTest { baseServer.request(request); } - protected void indexAndUpdateCount(DocRouter router, List ranges, int[] docCounts, String id) throws Exception { - index("id", id); + protected void indexAndUpdateCount(DocRouter router, List ranges, int[] docCounts, String id, int n) throws Exception { + index("id", id, "n_ti", n); + int idx = getHashRangeIdx(router, ranges, docCounts, id); + if (idx != -1) { + docCounts[idx]++; + } + } + + protected void deleteAndUpdateCount(DocRouter router, List ranges, int[] docCounts, String id) throws Exception { + controlClient.deleteById(id); + cloudClient.deleteById(id); + + int idx = getHashRangeIdx(router, ranges, docCounts, id); + if (idx != -1) { + docCounts[idx]--; + } + } + + private int getHashRangeIdx(DocRouter router, List ranges, int[] docCounts, String id) { int hash = 0; if (router instanceof HashBasedRouter) { HashBasedRouter hashBasedRouter = (HashBasedRouter) router; @@ -219,8 +256,9 @@ public class ShardSplitTest extends BasicDistributedZkTest { for (int i = 0; i < ranges.size(); i++) { DocRouter.Range range = ranges.get(i); if (range.includes(hash)) - docCounts[i]++; + return i; } + return -1; } protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count) {