From de58717183d0690254daa56d7dad7692bb435c4a Mon Sep 17 00:00:00 2001 From: Bar Rotstein Date: Tue, 19 Mar 2019 13:26:31 -0400 Subject: [PATCH] SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud specifics into a subclass Closes #528 (cherry picked from commit 5b7866b0851eff66cb7e929beef5249e3c72ac36) --- solr/CHANGES.txt | 4 + .../update/processor/CdcrUpdateProcessor.java | 2 +- .../processor/DistributedUpdateProcessor.java | 1311 ++--------------- .../DistributedUpdateProcessorFactory.java | 9 +- .../DistributedZkUpdateProcessor.java | 1235 ++++++++++++++++ .../DocBasedVersionConstraintsProcessor.java | 3 +- ...SkipExistingDocumentsProcessorFactory.java | 3 +- .../AtomicUpdateProcessorFactoryTest.java | 2 +- .../DistributedUpdateProcessorTest.java | 4 +- .../java/org/apache/solr/SolrTestCaseJ4.java | 11 + 10 files changed, 1376 insertions(+), 1208 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index bc1de23ab31..5f61fc9bf95 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -107,6 +107,7 @@ Bug Fixes Improvements ---------------------- + * SOLR-12999: Index replication could delete segments before downloading segments from master if there is not enough disk space (noble) @@ -152,6 +153,9 @@ Other Changes * SOLR-8033: Remove debug if branch in HdfsTransactionLog (Kevin Risden) +* SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud functionality into a subclass. + (Bar Rotstein, David Smiley) + ================== 8.0.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java index ee454670767..fe13a91bdce 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; * by the target cluster. *

*/ -public class CdcrUpdateProcessor extends DistributedUpdateProcessor { +public class CdcrUpdateProcessor extends DistributedZkUpdateProcessor { public static final String CDCR_UPDATE = "cdcr.update"; diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index a5d28985d2d..50660cba684 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -16,23 +16,11 @@ */ package org.apache.solr.update.processor; -import static org.apache.solr.common.params.CommonParams.DISTRIB; -import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; - import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import org.apache.lucene.util.BytesRef; @@ -41,38 +29,20 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest.METHOD; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.request.GenericSolrRequest; -import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.SimpleSolrResponse; -import org.apache.solr.cloud.CloudDescriptor; -import org.apache.solr.cloud.Overseer; -import org.apache.solr.cloud.ZkController; -import org.apache.solr.cloud.ZkShardTerms; -import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.RoutingRule; -import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.Slice.State; -import org.apache.solr.common.cloud.ZkCoreNodeProps; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.Hash; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.TimeSource; -import org.apache.solr.common.util.Utils; -import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -80,14 +50,9 @@ import org.apache.solr.schema.SchemaField; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; -import org.apache.solr.update.MergeIndexesCommand; -import org.apache.solr.update.RollbackUpdateCommand; import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.SolrCmdDistributor.Error; import org.apache.solr.update.SolrCmdDistributor.Node; -import org.apache.solr.update.SolrCmdDistributor.ForwardNode; -import org.apache.solr.update.SolrCmdDistributor.StdNode; -import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.update.UpdateCommand; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateShardHandler; @@ -95,10 +60,12 @@ import org.apache.solr.update.VersionBucket; import org.apache.solr.update.VersionInfo; import org.apache.solr.util.TestInjection; import org.apache.solr.util.TimeOut; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.common.params.CommonParams.DISTRIB; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + // NOT mt-safe... create a new processor for each add thread // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for public class DistributedUpdateProcessor extends UpdateRequestProcessor { @@ -109,7 +76,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public static final String DISTRIB_FROM_PARENT = "distrib.from.parent"; public static final String DISTRIB_FROM = "distrib.from"; public static final String DISTRIB_INPLACE_PREVVERSION = "distrib.inplace.prevversion"; - private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; + protected static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** @@ -150,9 +117,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // used to assert we don't call finish more than once, see finish() private boolean finished = false; - private final SolrQueryRequest req; - private final SolrQueryResponse rsp; - private final UpdateRequestProcessor next; + protected final SolrQueryRequest req; + protected final SolrQueryResponse rsp; private final AtomicUpdateDocumentMerger docMerger; private final UpdateLog ulog; @@ -167,47 +133,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private final SchemaField idField; - private SolrCmdDistributor cmdDistrib; - - private final boolean zkEnabled; - - private final CloudDescriptor cloudDesc; - private final String collection; - private final ZkController zkController; - // these are setup at the start of each request processing // method in this update processor - private boolean isLeader = true; - private boolean forwardToLeader = false; - private boolean isSubShardLeader = false; - private List nodes; - private Set skippedCoreNodeNames; - private boolean isIndexChanged = false; - - private boolean readOnlyCollection = false; + protected boolean isLeader = true; + protected boolean forwardToLeader = false; + protected boolean isSubShardLeader = false; + protected boolean isIndexChanged = false; /** * Number of times requests forwarded to some other shard's leader can be retried */ - private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT; + protected final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT; /** * Number of times requests from leaders to followers can be retried */ - private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT; + protected final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT; - private UpdateCommand updateCommand; // the current command this processor is working on. + protected UpdateCommand updateCommand; // the current command this processor is working on. - //used for keeping track of replicas that have processed an add/update from the leader - private RollupRequestReplicationTracker rollupReplicationTracker = null; - private LeaderRequestReplicationTracker leaderReplicationTracker = null; + protected final Replica.Type replicaType; - // should we clone the document before sending it to replicas? - // this is set to true in the constructor if the next processors in the chain - // are custom and may modify the SolrInputDocument racing with its serialization for replication - private final boolean cloneRequiredOnLeader; - private final Replica.Type replicaType; - - public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, + UpdateRequestProcessor next) { this(req, rsp, new AtomicUpdateDocumentMerger(req), next); } @@ -215,12 +162,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { * @lucene.experimental */ public DistributedUpdateProcessor(SolrQueryRequest req, - SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger, UpdateRequestProcessor next) { + SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger, + UpdateRequestProcessor next) { super(next); this.rsp = rsp; - this.next = next; this.docMerger = docMerger; this.idField = req.getSchema().getUniqueKeyField(); + this.req = req; + this.replicaType = computeReplicaType(); // version init this.ulog = req.getCore().getUpdateHandler().getUpdateLog(); @@ -231,473 +180,32 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO: better way to get the response, or pass back info to it? // SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null; - this.req = req; - // this should always be used - see filterParams DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD, UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS); - CoreContainer cc = req.getCore().getCoreContainer(); - - this.zkEnabled = cc.isZooKeeperAware(); - zkController = cc.getZkController(); - if (zkEnabled) { - cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); - } //this.rsp = reqInfo != null ? reqInfo.getRsp() : null; - cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor(); - - if (cloudDesc != null) { - collection = cloudDesc.getCollectionName(); - replicaType = cloudDesc.getReplicaType(); - DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection); - if (coll != null) { - // check readOnly property in coll state - readOnlyCollection = coll.isReadOnly(); - } - } else { - collection = null; - replicaType = Replica.Type.NRT; - } - - boolean shouldClone = false; - UpdateRequestProcessor nextInChain = next; - while (nextInChain != null) { - Class klass = nextInChain.getClass(); - if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class - && klass != RunUpdateProcessor.class - && klass != TolerantUpdateProcessor.class) { - shouldClone = true; - break; - } - nextInChain = nextInChain.next; - } - cloneRequiredOnLeader = shouldClone; } - private boolean isReadOnly() { - return readOnlyCollection || req.getCore().readOnly; + /** + * + * @return the replica type of the collection. + */ + protected Replica.Type computeReplicaType() { + return Replica.Type.NRT; } - private List setupRequest(String id, SolrInputDocument doc) { - return setupRequest(id, doc, null); + boolean isLeader() { + return isLeader; } - private List setupRequest(String id, SolrInputDocument doc, String route) { - // if we are in zk mode... - if (!zkEnabled) { - return null; - } - - assert TestInjection.injectUpdateRandomPause(); - - if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { - isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. - forwardToLeader = false; - return null; - } - - ClusterState cstate = zkController.getClusterState(); - DocCollection coll = cstate.getCollection(collection); - Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); - - if (slice == null) { - // No slice found. Most strict routers will have already thrown an exception, so a null return is - // a signal to use the slice of this core. - // TODO: what if this core is not in the targeted collection? - String shardId = cloudDesc.getShardId(); - slice = coll.getSlice(shardId); - if (slice == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); - } - } - - DistribPhase phase = - DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - - if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) { - if (cloudDesc.isLeader()) { - // locally we think we are leader but the request says it came FROMLEADER - // that could indicate a problem, let the full logic below figure it out - } else { - - assert TestInjection.injectFailReplicaRequests(); - - isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. - forwardToLeader = false; - return null; - } - } - - String shardId = slice.getName(); - - try { - // Not equivalent to getLeaderProps, which retries to find a leader. - // Replica leader = slice.getLeader(); - Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); - isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); - - if (!isLeader) { - isSubShardLeader = amISubShardLeader(coll, slice, id, doc); - if (isSubShardLeader) { - shardId = cloudDesc.getShardId(); - leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); - } - } - - doDefensiveChecks(phase); - - // if request is coming from another collection then we want it to be sent to all replicas - // even if its phase is FROMLEADER - String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); - - if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { - // we are coming from the leader, just go local - add no urls - forwardToLeader = false; - return null; - } else if (isLeader || isSubShardLeader) { - // that means I want to forward onto my replicas... - // so get the replicas... - forwardToLeader = false; - ClusterState clusterState = zkController.getZkStateReader().getClusterState(); - String leaderCoreNodeName = leaderReplica.getName(); - List replicas = clusterState.getCollection(collection) - .getSlice(shardId) - .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); - if (replicas.isEmpty()) { - return null; - } - - // check for test param that lets us miss replicas - String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); - Set skipListSet = null; - if (skipList != null) { - skipListSet = new HashSet<>(skipList.length); - skipListSet.addAll(Arrays.asList(skipList)); - log.info("test.distrib.skip.servers was found and contains:" + skipListSet); - } - - List nodes = new ArrayList<>(replicas.size()); - skippedCoreNodeNames = new HashSet<>(); - ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); - for (Replica replica: replicas) { - String coreNodeName = replica.getName(); - if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { - log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); - } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { - log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); - skippedCoreNodeNames.add(replica.getName()); - } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { - skippedCoreNodeNames.add(replica.getName()); - } else { - nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers)); - } - } - return nodes; - - } else { - // I need to forward on to the leader... - forwardToLeader = true; - return Collections.singletonList( - new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward)); - } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); - } - } - - /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ - private boolean couldIbeSubShardLeader(DocCollection coll) { - // Could I be the leader of a shard in "construction/recovery" state? - String myShardId = cloudDesc.getShardId(); - Slice mySlice = coll.getSlice(myShardId); - State state = mySlice.getState(); - return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY; - } - - /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ - private boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException { - // Am I the leader of a shard in "construction/recovery" state? - String myShardId = cloudDesc.getShardId(); - Slice mySlice = coll.getSlice(myShardId); - final State state = mySlice.getState(); - if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { - Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); - boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName()); - if (amILeader) { - // Does the document belong to my hash range as well? - DocRouter.Range myRange = mySlice.getRange(); - if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); - if (parentSlice != null) { - boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange()); - return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll); - } else { - // delete by query case -- as long as I am a sub shard leader we're fine - return true; - } - } - } - return false; - } - - private List getReplicaNodesForLeader(String shardId, Replica leaderReplica) { - ClusterState clusterState = zkController.getZkStateReader().getClusterState(); - String leaderCoreNodeName = leaderReplica.getName(); - List replicas = clusterState.getCollection(collection) - .getSlice(shardId) - .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); - if (replicas.isEmpty()) { - return null; - } - - // check for test param that lets us miss replicas - String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); - Set skipListSet = null; - if (skipList != null) { - skipListSet = new HashSet<>(skipList.length); - skipListSet.addAll(Arrays.asList(skipList)); - log.info("test.distrib.skip.servers was found and contains:" + skipListSet); - } - - List nodes = new ArrayList<>(replicas.size()); - skippedCoreNodeNames = new HashSet<>(); - ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); - for (Replica replica : replicas) { - String coreNodeName = replica.getName(); - if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { - log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); - } else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { - log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); - skippedCoreNodeNames.add(replica.getName()); - } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) - || replica.getState() == Replica.State.DOWN) { - skippedCoreNodeNames.add(replica.getName()); - } else { - nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId)); - } - } - return nodes; - } - - /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ - private List getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) { - Collection allSlices = coll.getSlices(); - List nodes = null; - for (Slice aslice : allSlices) { - final Slice.State state = aslice.getState(); - if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { - DocRouter.Range myRange = coll.getSlice(shardId).getRange(); - if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); - boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange); - if (isSubset && - (docId == null // in case of deletes - || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) { - Replica sliceLeader = aslice.getLeader(); - // slice leader can be null because node/shard is created zk before leader election - if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { - if (nodes == null) nodes = new ArrayList<>(); - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); - nodes.add(new StdNode(nodeProps, coll.getName(), aslice.getName())); - } - } - } - } - return nodes; - } - - /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */ - private List getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) { - DocRouter router = coll.getRouter(); - List nodes = null; - if (router instanceof CompositeIdRouter) { - CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; - String myShardId = cloudDesc.getShardId(); - Slice slice = coll.getSlice(myShardId); - Map routingRules = slice.getRoutingRules(); - if (routingRules != null) { - - // delete by query case - if (id == null) { - for (Entry entry : routingRules.entrySet()) { - String targetCollectionName = entry.getValue().getTargetCollectionName(); - final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName); - if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) { - final Slice[] activeSlices = docCollection.getActiveSlicesArr(); - Slice any = activeSlices[0]; - if (nodes == null) nodes = new ArrayList<>(); - nodes.add(new StdNode(new ZkCoreNodeProps(any.getLeader()))); - } - } - return nodes; - } - - String routeKey = SolrIndexSplitter.getRouteKey(id); - if (routeKey != null) { - RoutingRule rule = routingRules.get(routeKey + "!"); - if (rule != null) { - if (! rule.isExpired()) { - List ranges = rule.getRouteRanges(); - if (ranges != null && !ranges.isEmpty()) { - int hash = compositeIdRouter.sliceHash(id, doc, null, coll); - for (DocRouter.Range range : ranges) { - if (range.includes(hash)) { - DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName()); - Collection activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl); - if (activeSlices == null || activeSlices.isEmpty()) { - throw new SolrException(ErrorCode.SERVER_ERROR, - "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName()); - } - Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName()); - nodes = new ArrayList<>(1); - nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader))); - break; - } - } - } - } else { - ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock(); - if (!ruleExpiryLock.isLocked()) { - try { - if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) { - log.info("Going to expire routing rule"); - try { - Map map = Utils.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(), - ZkStateReader.COLLECTION_PROP, collection, - ZkStateReader.SHARD_ID_PROP, myShardId, - "routeKey", routeKey + "!"); - zkController.getOverseer().offerStateUpdate(Utils.toJSON(map)); - } catch (KeeperException e) { - log.warn("Exception while removing routing rule for route key: " + routeKey, e); - } catch (Exception e) { - log.error("Exception while removing routing rule for route key: " + routeKey, e); - } finally { - ruleExpiryLock.unlock(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - } - } - } - return nodes; - } - - private void doDefensiveChecks(DistribPhase phase) { - boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; - if (isReplayOrPeersync) return; - - String from = req.getParams().get(DISTRIB_FROM); - ClusterState clusterState = zkController.getClusterState(); - - DocCollection docCollection = clusterState.getCollection(collection); - Slice mySlice = docCollection.getSlice(cloudDesc.getShardId()); - boolean localIsLeader = cloudDesc.isLeader(); - if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay - String fromShard = req.getParams().get(DISTRIB_FROM_PARENT); - if (fromShard != null) { - if (mySlice.getState() == Slice.State.ACTIVE) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, - "Request says it is coming from parent shard leader but we are in active state"); - } - // shard splitting case -- check ranges to see if we are a sub-shard - Slice fromSlice = docCollection.getSlice(fromShard); - DocRouter.Range parentRange = fromSlice.getRange(); - if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); - if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, - "Request says it is coming from parent shard leader but parent hash range is not superset of my range"); - } - } else { - String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule? - if (fromCollection == null) { - log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); - SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); - solrExc.setMetadata("cause", "LeaderChanged"); - throw solrExc; - } - } - } - - int count = 0; - while (((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) && count < 5) { - count++; - // re-getting localIsLeader since we published to ZK first before setting localIsLeader value - localIsLeader = cloudDesc.isLeader(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) { - log.error("ClusterState says we are the leader, but locally we don't think so"); - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, - "ClusterState says we are the leader (" + zkController.getBaseUrl() - + "/" + req.getCore().getName() + "), but locally we don't think so. Request came from " + from); - } - } - - - // used for deleteByQuery to get the list of nodes this leader should forward to - private List setupRequestForDBQ() { - List nodes = null; - String shardId = cloudDesc.getShardId(); - - try { - Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); - isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); - - // TODO: what if we are no longer the leader? - - forwardToLeader = false; - List replicaProps = zkController.getZkStateReader() - .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - if (replicaProps != null) { - nodes = new ArrayList<>(replicaProps.size()); - for (ZkCoreNodeProps props : replicaProps) { - nodes.add(new StdNode(props, collection, shardId)); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } - - return nodes; - } - - @Override public void processAdd(AddUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); - if (isReadOnly()) { - throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); - } - - updateCommand = cmd; - - if (zkEnabled) { - zkCheck(); - nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument()); - } else { - isLeader = getNonZkLeaderAssumption(req); - } - - // check if client has requested minimum replication factor information. will set replicationTracker to null if - // we aren't the leader or subShardLeader - checkReplicationTracker(cmd); + setupRequest(cmd); // If we were sent a previous version, set this to the AddUpdateCommand (if not already set) if (!cmd.isInPlaceUpdate()) { @@ -717,58 +225,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } - if (zkEnabled && isLeader && !isSubShardLeader) { - DocCollection coll = zkController.getClusterState().getCollection(collection); - List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); - // the list will actually have only one element for an add request - if (subShardLeaders != null && !subShardLeaders.isEmpty()) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); - cmdDistrib.distribAdd(cmd, subShardLeaders, params, true); - } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); - if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - params.set(DISTRIB_FROM_COLLECTION, collection); - params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); - cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true); - } - } - - if (nodes != null) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, - (isLeader || isSubShardLeader ? - DistribPhase.FROMLEADER.toString() : - DistribPhase.TOLEADER.toString())); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - - if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { - // TODO: Kept for rolling upgrades only. Should be removed in Solr 9 - params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); - } - - if (cmd.isInPlaceUpdate()) { - params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion)); - - // Use synchronous=true so that a new connection is used, instead - // of the update being streamed through an existing streaming client. - // When using a streaming client, the previous update - // and the current in-place update (that depends on the previous update), if reordered - // in the stream, can result in the current update being bottled up behind the previous - // update in the stream and can lead to degraded performance. - cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker); - } else { - cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); - } - } + doDistribAdd(cmd); // TODO: what to do when no idField? if (returnVersions && rsp != null && idField != null) { @@ -788,218 +245,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } - // helper method, processAdd was getting a bit large. - // Sets replicationTracker = null if we aren't the leader - // We have two possibilities here: - // - // 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker - // 2> we're a follower: allocat a RollupTracker - // - private void checkReplicationTracker(UpdateCommand cmd) { - if (zkEnabled == false) { - rollupReplicationTracker = null; // never need one of these in stand-alone - leaderReplicationTracker = null; - return; - } - - SolrParams rp = cmd.getReq().getParams(); - String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM); - // Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the - // course of a batch. - if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) && - rollupReplicationTracker == null) { - rollupReplicationTracker = new RollupRequestReplicationTracker(); - } - // If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in - // a batch we need to use the _same_ leader replication tracker. - if (isLeader && leaderReplicationTracker == null) { - leaderReplicationTracker = new LeaderRequestReplicationTracker( - req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); - } - } - - - @Override - protected void doClose() { - if (cmdDistrib != null) { - cmdDistrib.close(); - } - } - - // TODO: optionally fail if n replicas are not reached... - private void doFinish() { - boolean shouldUpdateTerms = isLeader && isIndexChanged; - if (shouldUpdateTerms) { - ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()); - if (skippedCoreNodeNames != null) { - zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames); - } - zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero(); - } - // TODO: if not a forward and replication req is not specified, we could - // send in a background thread - - cmdDistrib.finish(); - List errors = cmdDistrib.getErrors(); - // TODO - we may need to tell about more than one error... - - List errorsForClient = new ArrayList<>(errors.size()); - Set replicasShouldBeInLowerTerms = new HashSet<>(); - for (final SolrCmdDistributor.Error error : errors) { - - if (error.req.node instanceof ForwardNode) { - // if it's a forward, any fail is a problem - - // otherwise we assume things are fine if we got it locally - // until we start allowing min replication param - errorsForClient.add(error); - continue; - } - - // else... - - // for now we don't error - we assume if it was added locally, we - // succeeded - if (log.isWarnEnabled()) { - log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e); - } - - // Since it is not a forward request, for each fail, try to tell them to - // recover - the doc was already added locally, so it should have been - // legit - - DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM)); - if (phase != DistribPhase.FROMLEADER) - continue; // don't have non-leaders try to recovery other nodes - - // commits are special -- they can run on any node irrespective of whether it is a leader or not - // we don't want to run recovery on a node which missed a commit command - if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) - continue; - - final String replicaUrl = error.req.node.getUrl(); - - // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request - String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null; - if ("LeaderChanged".equals(cause)) { - // let's just fail this request and let the client retry? or just call processAdd again? - log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+ - " now thinks it is the leader! Failing the request to let the client retry! "+error.e); - errorsForClient.add(error); - continue; - } - - String collection = null; - String shardId = null; - - if (error.req.node instanceof StdNode) { - StdNode stdNode = (StdNode)error.req.node; - collection = stdNode.getCollection(); - shardId = stdNode.getShardId(); - - // before we go setting other replicas to down, make sure we're still the leader! - String leaderCoreNodeName = null; - Exception getLeaderExc = null; - Replica leaderProps = null; - try { - leaderProps = zkController.getZkStateReader().getLeader(collection, shardId); - if (leaderProps != null) { - leaderCoreNodeName = leaderProps.getName(); - } - } catch (Exception exc) { - getLeaderExc = exc; - } - if (leaderCoreNodeName == null) { - log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " + - "before putting {} into leader-initiated recovery", - cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc); - } - - List myReplicas = zkController.getZkStateReader().getReplicaProps(collection, - cloudDesc.getShardId(), cloudDesc.getCoreNodeName()); - boolean foundErrorNodeInReplicaList = false; - if (myReplicas != null) { - for (ZkCoreNodeProps replicaProp : myReplicas) { - if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) { - foundErrorNodeInReplicaList = true; - break; - } - } - } - - if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader - && foundErrorNodeInReplicaList // we found an error for one of replicas - && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR - try { - String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName(); - // if false, then the node is probably not "live" anymore - // and we do not need to send a recovery message - Throwable rootCause = SolrException.getRootCause(error.e); - log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause); - replicasShouldBeInLowerTerms.add(coreNodeName); - } catch (Exception exc) { - Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc); - log.error("Leader failed to set replica " + - error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause); - } - } else { - // not the leader anymore maybe or the error'd node is not my replica? - if (!foundErrorNodeInReplicaList) { - log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+ - cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + - "No request recovery command will be sent!"); - if (!shardId.equals(cloudDesc.getShardId())) { - // some replicas on other shard did not receive the updates (ex: during splitshard), - // exception must be notified to clients - errorsForClient.add(error); - } - } else { - log.warn("Core " + cloudDesc.getCoreNodeName() + " is no longer the leader for " + collection + " " - + shardId + " or we tried to put ourself into LIR, no request recovery command will be sent!"); - } - } - } - } - if (!replicasShouldBeInLowerTerms.isEmpty()) { - zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()) - .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms); - } - handleReplicationFactor(); - if (0 < errorsForClient.size()) { - throw new DistributedUpdatesAsyncException(errorsForClient); - } - } - - /** - * If necessary, include in the response the achieved replication factor - */ - @SuppressWarnings("deprecation") - private void handleReplicationFactor() { - if (leaderReplicationTracker != null || rollupReplicationTracker != null) { - int achievedRf = Integer.MAX_VALUE; - - if (leaderReplicationTracker != null) { - - achievedRf = leaderReplicationTracker.getAchievedRf(); - - // Transfer this to the rollup tracker if it exists - if (rollupReplicationTracker != null) { - rollupReplicationTracker.testAndSetAchievedRf(achievedRf); - } - } - - // Rollup tracker has accumulated stats. - if (rollupReplicationTracker != null) { - achievedRf = rollupReplicationTracker.getAchievedRf(); - } - if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { - // Unused, but kept for back compatibility. To be removed in Solr 9 - rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT))); - } - rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf); - rollupReplicationTracker = null; - leaderReplicationTracker = null; - - } + protected void doDistribAdd(AddUpdateCommand cmd) throws IOException { + // no-op for derived classes to implement } // must be synchronized by bucket @@ -1217,17 +464,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; - - SolrInputDocument clonedDoc = null; - if (willDistrib && cloneRequiredOnLeader) { - clonedDoc = cmd.solrDoc.deepCopy(); - } + SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null; // TODO: possibly set checkDeleteByQueries as a flag on the command? doLocalAdd(cmd); - if (willDistrib && cloneRequiredOnLeader) { + if (clonedDoc != null) { cmd.solrDoc = clonedDoc; } } finally { @@ -1244,6 +486,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + /** + * + * @return whether cmd doc should be cloned before localAdd + */ + protected boolean shouldCloneCmdDoc() { + return false; + } + @VisibleForTesting boolean shouldBufferUpdate(AddUpdateCommand cmd, boolean isReplayOrPeersync, UpdateLog.State state) { if (state == UpdateLog.State.APPLYING_BUFFERED @@ -1347,23 +597,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { params.set("onlyIfActive", true); SolrRequest ur = new GenericSolrRequest(METHOD.GET, "/get", params); - String leaderUrl = req.getParams().get(DISTRIB_FROM); - - if (leaderUrl == null) { - // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is - // down or partitioned from us for some reason. Lets force refresh cluster state, and request the - // leader for the update. - if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test - throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader " - + "failed since we're not in cloud mode."); - } - Replica leader; - try { - leader = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()); - } catch (InterruptedException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e); - } - leaderUrl = leader.getCoreUrl(); + String leaderUrl = getLeaderUrl(id); + + if(leaderUrl == null) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id); } NamedList rsp; @@ -1433,11 +670,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void processDelete(DeleteUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); - - if (isReadOnly()) { - throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); - } - + updateCommand = cmd; if (!cmd.isDeleteById()) { @@ -1451,17 +684,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // have any documents specified by those IDs, the request is not forwarded to any other replicas on that shard. Thus // we have to spoof the replicationTracker and set the achieved rf to the number of active replicas. // - private void doDeleteById(DeleteUpdateCommand cmd) throws IOException { - if (zkEnabled) { - zkCheck(); - nodes = setupRequest(cmd.getId(), null, cmd.getRoute()); - } else { - isLeader = getNonZkLeaderAssumption(req); - } + protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException { - // check if client has requested minimum replication factor information. will set replicationTracker to null if - // we aren't the leader or subShardLeader - checkReplicationTracker(cmd); + setupRequest(cmd); boolean dropCmd = false; if (!forwardToLeader) { @@ -1473,45 +698,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } - if (zkEnabled && isLeader && !isSubShardLeader) { - DocCollection coll = zkController.getClusterState().getCollection(collection); - List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null); - // the list will actually have only one element for an add request - if (subShardLeaders != null && !subShardLeaders.isEmpty()) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); - cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null); - } - - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null); - if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - params.set(DISTRIB_FROM_COLLECTION, collection); - params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); - cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null); - } - } - - if (nodes != null) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, - (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() - : DistribPhase.TOLEADER.toString())); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - - if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { - // TODO: Kept for rolling upgrades only. Remove in Solr 9 - params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); - } - cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); - } + doDistribDeleteById(cmd); // cmd.getIndexId == null when delete by query // TODO: what to do when no idField? @@ -1526,6 +713,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + /** + * This method can be overridden to tamper with the cmd after the localDeleteById operation + * @param cmd the delete command + * @throws IOException in case post processing failed + */ + protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException { + // no-op for derived classes to implement + } + /** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */ @SuppressWarnings("unchecked") protected ModifiableSolrParams filterParams(SolrParams params) { @@ -1549,103 +745,24 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { - + /** + * for implementing classes to setup request data(nodes, replicas) + * @param cmd the delete command being processed + */ + protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { // even in non zk mode, tests simulate updates from a leader - if(!zkEnabled) { - isLeader = getNonZkLeaderAssumption(req); - } else { - zkCheck(); - } - - // NONE: we are the first to receive this deleteByQuery - // - it must be forwarded to the leader of every shard - // TO: we are a leader receiving a forwarded deleteByQuery... we must: - // - block all updates (use VersionInfo) - // - flush *all* updates going to our replicas - // - forward the DBQ to our replicas and wait for the response - // - log + execute the local DBQ - // FROM: we are a replica receiving a DBQ from our leader - // - log + execute the local DBQ - DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - - DocCollection coll = zkEnabled - ? zkController.getClusterState().getCollection(collection) : null; - - if (zkEnabled && DistribPhase.NONE == phase) { - if (rollupReplicationTracker == null) { - rollupReplicationTracker = new RollupRequestReplicationTracker(); - } - boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard - - ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); - outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); - outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - - SolrParams params = req.getParams(); - String route = params.get(ShardParams._ROUTE_); - Collection slices = coll.getRouter().getSearchSlices(route, params, coll); - - List leaders = new ArrayList<>(slices.size()); - for (Slice slice : slices) { - String sliceName = slice.getName(); - Replica leader; - try { - leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName); - } catch (InterruptedException e) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e); - } - - // TODO: What if leaders changed in the meantime? - // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? - - // Am I the leader for this slice? - ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader); - String leaderCoreNodeName = leader.getName(); - String coreNodeName = cloudDesc.getCoreNodeName(); - isLeader = coreNodeName.equals(leaderCoreNodeName); - - if (isLeader) { - // don't forward to ourself - leaderForAnyShard = true; - } else { - leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); - } - } - - outParams.remove("commit"); // this will be distributed from the local commit - - - if (params.get(UpdateRequest.MIN_REPFACT) != null) { - // TODO: Kept this for rolling upgrades. Remove in Solr 9 - outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); - } - cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null); - - if (!leaderForAnyShard) { - return; - } - - // change the phase to TOLEADER so we look up and forward to our own replicas (if any) - phase = DistribPhase.TOLEADER; - } - - List replicas = null; - - if (zkEnabled && DistribPhase.TOLEADER == phase) { - // This core should be a leader - isLeader = true; - replicas = setupRequestForDBQ(); - } else if (DistribPhase.FROMLEADER == phase) { - isLeader = false; - } - - - // check if client has requested minimum replication factor information. will set replicationTracker to null if - // we aren't the leader or subShardLeader - checkReplicationTracker(cmd); + setupRequest(cmd); + doDeleteByQuery(cmd, null, null); + } + /** + * should be called by implementing class after setting up replicas + * @param cmd delete command + * @param replicas list of Nodes replicas to pass to {@link DistributedUpdateProcessor#doDistribDeleteByQuery(DeleteUpdateCommand, List, DocCollection)} + * @param coll the collection in zookeeper {@link org.apache.solr.common.cloud.DocCollection}, + * passed to {@link DistributedUpdateProcessor#doDistribDeleteByQuery(DeleteUpdateCommand, List, DocCollection)} + */ + protected void doDeleteByQuery(DeleteUpdateCommand cmd, List replicas, DocCollection coll) throws IOException { if (vinfo == null) { super.processDelete(cmd); return; @@ -1654,78 +771,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // at this point, there is an update we need to try and apply. // we may or may not be the leader. - boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; - boolean leaderLogic = isLeader && !isReplayOrPeersync; versionDeleteByQuery(cmd); - if (zkEnabled) { - // forward to all replicas - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - boolean someReplicas = false; - boolean subShardLeader = false; - try { - subShardLeader = amISubShardLeader(coll, null, null, null); - if (subShardLeader) { - String myShardId = cloudDesc.getShardId(); - Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( - collection, myShardId); - // DBQ forwarded to NRT and TLOG replicas - List replicaProps = zkController.getZkStateReader() - .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - if (replicaProps != null) { - final List myReplicas = new ArrayList<>(replicaProps.size()); - for (ZkCoreNodeProps replicaProp : replicaProps) { - myReplicas.add(new StdNode(replicaProp, collection, myShardId)); - } - cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker); - someReplicas = true; - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); - } - if (leaderLogic) { - List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null); - if (subShardLeaders != null) { - cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker); - } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null); - if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { - params = new ModifiableSolrParams(filterParams(req.getParams())); - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - params.set(DISTRIB_FROM_COLLECTION, collection); - params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); - - cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker); - } - if (replicas != null) { - cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker); - someReplicas = true; - } - } - - if (someReplicas) { - cmdDistrib.blockAndDoRetries(); - } - } + doDistribDeleteByQuery(cmd, replicas, coll); if (returnVersions && rsp != null) { if (deleteByQueryResponse == null) { deleteByQueryResponse = new NamedList<>(1); - rsp.add("deleteByQuery",deleteByQueryResponse); + rsp.add("deleteByQuery", deleteByQueryResponse); } deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion()); } } + /** + * This runs after versionDeleteByQuery is invoked, should be used to tamper or forward DeleteCommand + * @param cmd delete command + * @param replicas list of Nodes replicas + * @param coll the collection in zookeeper {@link org.apache.solr.common.cloud.DocCollection}. + * @throws IOException in case post processing failed + */ + protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List replicas, DocCollection coll) throws IOException { + // no-op for derived classes to implement + } + protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { // Find the version long versionOnUpdate = cmd.getVersion(); @@ -1780,46 +850,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - // internal helper method to tell if we are the leader for an add or deleteById update + // internal helper method to setup request by processors who use this class. // NOTE: not called by this class! - boolean isLeader(UpdateCommand cmd) { + void setupRequest(UpdateCommand cmd) { updateCommand = cmd; - - if (zkEnabled) { - zkCheck(); - if (cmd instanceof AddUpdateCommand) { - AddUpdateCommand acmd = (AddUpdateCommand)cmd; - nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument()); - } else if (cmd instanceof DeleteUpdateCommand) { - DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd; - nodes = setupRequest(dcmd.getId(), null); - } - } else { - isLeader = getNonZkLeaderAssumption(req); - } - - return isLeader; + isLeader = getNonZkLeaderAssumption(req); } - private void zkCheck() { - - // Streaming updates can delay shutdown and cause big update reorderings (new streams can't be - // initiated, but existing streams carry on). This is why we check if the CC is shutdown. - // See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test. - if (req.getCore().getCoreContainer().isShutDown()) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down."); - } - - if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { - // for log reply or peer sync, we don't need to be connected to ZK - return; - } - - if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) { - return; - } - - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled."); + /** + * + * @param id id of doc + * @return url of leader, or null if not found. + */ + protected String getLeaderUrl(String id) { + return req.getParams().get(DISTRIB_FROM); } protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException { @@ -1946,101 +990,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void processCommit(CommitUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); - - if (isReadOnly()) { - throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); - } - - updateCommand = cmd; - List nodes = null; - Replica leaderReplica = null; - if (zkEnabled) { - zkCheck(); - try { - leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e); - } - isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); - - nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true); - if (nodes == null) { - // This could happen if there are only pull replicas - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT); - } - - nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName()) - && node.getNodeProps().getCoreName().equals(req.getCore().getName())); - } - if (!zkEnabled || (!isLeader && req.getParams().get(COMMIT_END_POINT, "").equals("replicas"))) { - if (replicaType == Replica.Type.TLOG) { + updateCommand = cmd; - if (isLeader) { - long commitVersion = vinfo.getNewClock(); - cmd.setVersion(commitVersion); - doLocalCommit(cmd); - } - - } else if (replicaType == Replica.Type.PULL) { - log.warn("Commit not supported on replicas of type " + Replica.Type.PULL); - } else { - // NRT replicas will always commit - if (vinfo != null) { - long commitVersion = vinfo.getNewClock(); - cmd.setVersion(commitVersion); - } - - doLocalCommit(cmd); - } - } else { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - - List useNodes = null; - if (req.getParams().get(COMMIT_END_POINT) == null) { - useNodes = nodes; - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); - params.set(COMMIT_END_POINT, "leaders"); - if (useNodes != null) { - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - cmdDistrib.distribCommit(cmd, useNodes, params); - cmdDistrib.blockAndDoRetries(); - } - } - - if (isLeader) { - params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - - params.set(COMMIT_END_POINT, "replicas"); - - useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica); - - if (useNodes != null) { - params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( - zkController.getBaseUrl(), req.getCore().getName())); - - cmdDistrib.distribCommit(cmd, useNodes, params); - } - // NRT replicas will always commit - if (vinfo != null) { - long commitVersion = vinfo.getNewClock(); - cmd.setVersion(commitVersion); - } - - doLocalCommit(cmd); - if (useNodes != null) { - cmdDistrib.blockAndDoRetries(); - } - } - } + // replica type can only be NRT in standalone mode + // NRT replicas will always commit + doLocalCommit(cmd); } - private void doLocalCommit(CommitUpdateCommand cmd) throws IOException { + protected void doLocalCommit(CommitUpdateCommand cmd) throws IOException { if (vinfo != null) { + long commitVersion = vinfo.getNewClock(); + cmd.setVersion(commitVersion); vinfo.lockForUpdate(); } try { @@ -2058,67 +1020,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - @Override - public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { - if (isReadOnly()) { - throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); - } - super.processMergeIndexes(cmd); - } - - @Override - public void processRollback(RollbackUpdateCommand cmd) throws IOException { - if (isReadOnly()) { - throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); - } - super.processRollback(cmd); - } - @Override public void finish() throws IOException { + assertNotFinished(); + + super.finish(); + } + + protected void assertNotFinished() { assert ! finished : "lifecycle sanity check"; finished = true; - - if (zkEnabled) doFinish(); - - if (next != null && nodes == null) next.finish(); - } - - private List getCollectionUrls(String collection, EnumSet types, boolean onlyLeaders) { - ClusterState clusterState = zkController.getClusterState(); - final DocCollection docCollection = clusterState.getCollectionOrNull(collection); - if (collection == null || docCollection.getSlicesMap() == null) { - throw new ZooKeeperException(ErrorCode.BAD_REQUEST, - "Could not find collection in zk: " + clusterState); - } - Map slices = docCollection.getSlicesMap(); - final List urls = new ArrayList<>(slices.size()); - for (Map.Entry sliceEntry : slices.entrySet()) { - Slice replicas = slices.get(sliceEntry.getKey()); - if (onlyLeaders) { - Replica replica = docCollection.getLeader(replicas.getName()); - if (replica != null) { - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica); - urls.add(new StdNode(nodeProps, collection, replicas.getName())); - } - continue; - } - Map shardMap = replicas.getReplicasMap(); - - for (Entry entry : shardMap.entrySet()) { - if (!types.contains(entry.getValue().getType())) { - continue; - } - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); - if (clusterState.liveNodesContain(nodeProps.getNodeName())) { - urls.add(new StdNode(nodeProps, collection, replicas.getName())); - } - } - } - if (urls.isEmpty()) { - return null; - } - return urls; } /** diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java index 4addae03432..93c1bf2871e 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java @@ -49,9 +49,16 @@ public class DistributedUpdateProcessorFactory @Override public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + + final boolean isZkAware = req.getCore().getCoreContainer().isZooKeeperAware(); + + DistributedUpdateProcessor distribUpdateProcessor = + isZkAware ? + new DistributedZkUpdateProcessor(req, rsp, next) : + new DistributedUpdateProcessor(req, rsp, next); // note: will sometimes return DURP (no overhead) instead of wrapping return RoutedAliasUpdateProcessor.wrap(req, - new DistributedUpdateProcessor(req, rsp, next)); + distribUpdateProcessor); } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java new file mode 100644 index 00000000000..abe4754e0d9 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -0,0 +1,1235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.ZkController; +import org.apache.solr.cloud.ZkShardTerms; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CompositeIdRouter; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.RoutingRule; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.DeleteUpdateCommand; +import org.apache.solr.update.MergeIndexesCommand; +import org.apache.solr.update.RollbackUpdateCommand; +import org.apache.solr.update.SolrCmdDistributor; +import org.apache.solr.update.SolrIndexSplitter; +import org.apache.solr.update.UpdateCommand; +import org.apache.solr.util.TestInjection; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { + + private final CloudDescriptor cloudDesc; + private final ZkController zkController; + private final SolrCmdDistributor cmdDistrib; + protected List nodes; + private Set skippedCoreNodeNames; + private final String collection; + private boolean readOnlyCollection = false; + + // should we clone the document before sending it to replicas? + // this is set to true in the constructor if the next processors in the chain + // are custom and may modify the SolrInputDocument racing with its serialization for replication + private final boolean cloneRequiredOnLeader; + + //used for keeping track of replicas that have processed an add/update from the leader + private RollupRequestReplicationTracker rollupReplicationTracker = null; + private LeaderRequestReplicationTracker leaderReplicationTracker = null; + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public DistributedZkUpdateProcessor(SolrQueryRequest req, + SolrQueryResponse rsp, UpdateRequestProcessor next) { + super(req, rsp, next); + CoreContainer cc = req.getCore().getCoreContainer(); + cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor(); + zkController = cc.getZkController(); + cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); + cloneRequiredOnLeader = isCloneRequiredOnLeader(next); + collection = cloudDesc.getCollectionName(); + DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection); + if (coll != null) { + // check readOnly property in coll state + readOnlyCollection = coll.isReadOnly(); + } + } + + private boolean isReadOnly() { + return readOnlyCollection || req.getCore().readOnly; + } + + private boolean isCloneRequiredOnLeader(UpdateRequestProcessor next) { + boolean shouldClone = false; + UpdateRequestProcessor nextInChain = next; + while (nextInChain != null) { + Class klass = nextInChain.getClass(); + if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class + && klass != RunUpdateProcessor.class + && klass != TolerantUpdateProcessor.class) { + shouldClone = true; + break; + } + nextInChain = nextInChain.next; + } + return shouldClone; + } + + @Override + protected Replica.Type computeReplicaType() { + // can't use cloudDesc since this is called by super class, before the constructor instantiates cloudDesc. + return req.getCore().getCoreDescriptor().getCloudDescriptor().getReplicaType(); + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + + assert TestInjection.injectFailUpdateRequests(); + + if (isReadOnly()) { + throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); + } + + updateCommand = cmd; + + List nodes = null; + Replica leaderReplica = null; + zkCheck(); + try { + leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e); + } + isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); + + nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true); + if (nodes == null) { + // This could happen if there are only pull replicas + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT); + } + + nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName()) + && node.getNodeProps().getCoreName().equals(req.getCore().getName())); + + if (!isLeader && req.getParams().get(COMMIT_END_POINT, "").equals("replicas")) { + if (replicaType == Replica.Type.PULL) { + log.warn("Commit not supported on replicas of type " + Replica.Type.PULL); + } else if (replicaType == Replica.Type.NRT) { + doLocalCommit(cmd); + } + } else { + // zk + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + + List useNodes = null; + if (req.getParams().get(COMMIT_END_POINT) == null) { + useNodes = nodes; + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); + params.set(COMMIT_END_POINT, "leaders"); + if (useNodes != null) { + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + cmdDistrib.distribCommit(cmd, useNodes, params); + cmdDistrib.blockAndDoRetries(); + } + } + + if (isLeader) { + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + + params.set(COMMIT_END_POINT, "replicas"); + + useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica); + + if (useNodes != null) { + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + cmdDistrib.distribCommit(cmd, useNodes, params); + } + + doLocalCommit(cmd); + + if (useNodes != null) { + cmdDistrib.blockAndDoRetries(); + } + } + } + } + + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + assert TestInjection.injectFailUpdateRequests(); + + if (isReadOnly()) { + throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); + } + + setupRequest(cmd); + + // check if client has requested minimum replication factor information. will set replicationTracker to null if + // we aren't the leader or subShardLeader + checkReplicationTracker(cmd); + + super.processAdd(cmd); + } + + @Override + protected void doDistribAdd(AddUpdateCommand cmd) throws IOException { + + if (isLeader && !isSubShardLeader) { + DocCollection coll = zkController.getClusterState().getCollection(collection); + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); + // the list will actually have only one element for an add request + if (subShardLeaders != null && !subShardLeaders.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); + cmdDistrib.distribAdd(cmd, subShardLeaders, params, true); + } + final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); + if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set(DISTRIB_FROM_COLLECTION, collection); + params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); + cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true); + } + } + + if (nodes != null) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, + (isLeader || isSubShardLeader ? + DistribPhase.FROMLEADER.toString() : + DistribPhase.TOLEADER.toString())); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { + // TODO: Kept for rolling upgrades only. Should be removed in Solr 9 + params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); + } + + if (cmd.isInPlaceUpdate()) { + params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion)); + + // Use synchronous=true so that a new connection is used, instead + // of the update being streamed through an existing streaming client. + // When using a streaming client, the previous update + // and the current in-place update (that depends on the previous update), if reordered + // in the stream, can result in the current update being bottled up behind the previous + // update in the stream and can lead to degraded performance. + cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker); + } else { + cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); + } + } + } + + @Override + public void processDelete(DeleteUpdateCommand cmd) throws IOException { + if (isReadOnly()) { + throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); + } + + super.processDelete(cmd); + } + + @Override + protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException { + setupRequest(cmd); + + // check if client has requested minimum replication factor information. will set replicationTracker to null if + // we aren't the leader or subShardLeader + checkReplicationTracker(cmd); + + super.doDeleteById(cmd); + } + + @Override + protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException { + if (isLeader && !isSubShardLeader) { + DocCollection coll = zkController.getClusterState().getCollection(collection); + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null); + // the list will actually have only one element for an add request + if (subShardLeaders != null && !subShardLeaders.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); + cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null); + } + + final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null); + if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set(DISTRIB_FROM_COLLECTION, collection); + params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); + cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null); + } + } + + if (nodes != null) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, + (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() + : DistribPhase.TOLEADER.toString())); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { + // TODO: Kept for rolling upgrades only. Remove in Solr 9 + params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); + } + cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker); + } + } + + @Override + protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { + zkCheck(); + + // NONE: we are the first to receive this deleteByQuery + // - it must be forwarded to the leader of every shard + // TO: we are a leader receiving a forwarded deleteByQuery... we must: + // - block all updates (use VersionInfo) + // - flush *all* updates going to our replicas + // - forward the DBQ to our replicas and wait for the response + // - log + execute the local DBQ + // FROM: we are a replica receiving a DBQ from our leader + // - log + execute the local DBQ + DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + + DocCollection coll = zkController.getClusterState().getCollection(collection); + + if (DistribPhase.NONE == phase) { + if (rollupReplicationTracker == null) { + rollupReplicationTracker = new RollupRequestReplicationTracker(); + } + boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard + + ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams())); + outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); + outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + SolrParams params = req.getParams(); + String route = params.get(ShardParams._ROUTE_); + Collection slices = coll.getRouter().getSearchSlices(route, params, coll); + + List leaders = new ArrayList<>(slices.size()); + for (Slice slice : slices) { + String sliceName = slice.getName(); + Replica leader; + try { + leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName); + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e); + } + + // TODO: What if leaders changed in the meantime? + // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? + + // Am I the leader for this slice? + ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader); + String leaderCoreNodeName = leader.getName(); + String coreNodeName = cloudDesc.getCoreNodeName(); + isLeader = coreNodeName.equals(leaderCoreNodeName); + + if (isLeader) { + // don't forward to ourself + leaderForAnyShard = true; + } else { + leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); + } + } + + outParams.remove("commit"); // this will be distributed from the local commit + + + if (params.get(UpdateRequest.MIN_REPFACT) != null) { + // TODO: Kept this for rolling upgrades. Remove in Solr 9 + outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT)); + } + cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null); + + if (!leaderForAnyShard) { + return; + } + + // change the phase to TOLEADER so we look up and forward to our own replicas (if any) + phase = DistribPhase.TOLEADER; + } + List replicas = null; + + if (DistribPhase.TOLEADER == phase) { + // This core should be a leader + isLeader = true; + replicas = setupRequestForDBQ(); + } else if (DistribPhase.FROMLEADER == phase) { + isLeader = false; + } + + // check if client has requested minimum replication factor information. will set replicationTracker to null if + // we aren't the leader or subShardLeader + checkReplicationTracker(cmd); + super.doDeleteByQuery(cmd, replicas, coll); + } + + @Override + protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List replicas, + DocCollection coll) throws IOException { + + boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; + boolean leaderLogic = isLeader && !isReplayOrPeersync; + + // forward to all replicas + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + + boolean someReplicas = false; + boolean subShardLeader = false; + try { + subShardLeader = amISubShardLeader(coll, null, null, null); + if (subShardLeader) { + String myShardId = cloudDesc.getShardId(); + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( + collection, myShardId); + // DBQ forwarded to NRT and TLOG replicas + List replicaProps = zkController.getZkStateReader() + .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); + if (replicaProps != null) { + final List myReplicas = new ArrayList<>(replicaProps.size()); + for (ZkCoreNodeProps replicaProp : replicaProps) { + myReplicas.add(new SolrCmdDistributor.StdNode(replicaProp, collection, myShardId)); + } + cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker); + someReplicas = true; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } + if (leaderLogic) { + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null); + if (subShardLeaders != null) { + cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker); + } + final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null); + if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { + params = new ModifiableSolrParams(filterParams(req.getParams())); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( + zkController.getBaseUrl(), req.getCore().getName())); + params.set(DISTRIB_FROM_COLLECTION, collection); + params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId()); + + cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker); + } + if (replicas != null) { + cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker); + someReplicas = true; + } + } + + if (someReplicas) { + cmdDistrib.blockAndDoRetries(); + } + } + + // used for deleteByQuery to get the list of nodes this leader should forward to + private List setupRequestForDBQ() { + List nodes = null; + String shardId = cloudDesc.getShardId(); + + try { + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); + isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); + + // TODO: what if we are no longer the leader? + + forwardToLeader = false; + List replicaProps = zkController.getZkStateReader() + .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); + if (replicaProps != null) { + nodes = new ArrayList<>(replicaProps.size()); + for (ZkCoreNodeProps props : replicaProps) { + nodes.add(new SolrCmdDistributor.StdNode(props, collection, shardId)); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } + + return nodes; + } + + @Override + protected String getLeaderUrl(String id) { + // try get leader from req params, fallback to zk lookup if not found. + String distribFrom = req.getParams().get(DISTRIB_FROM); + if(distribFrom != null) { + return distribFrom; + } + return getLeaderUrlZk(id); + } + + private String getLeaderUrlZk(String id) { + // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is + // down or partitioned from us for some reason. Lets force refresh cluster state, and request the + // leader for the update. + if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader " + + "failed since we're not in cloud mode."); + } + try { + return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()).getCoreUrl(); + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e); + } + } + + @Override + void setupRequest(UpdateCommand cmd) { + updateCommand = cmd; + zkCheck(); + if (cmd instanceof AddUpdateCommand) { + AddUpdateCommand acmd = (AddUpdateCommand)cmd; + nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument()); + } else if (cmd instanceof DeleteUpdateCommand) { + DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd; + nodes = setupRequest(dcmd.getId(), null); + } + } + + protected List setupRequest(String id, SolrInputDocument doc) { + return setupRequest(id, doc, null); + } + + protected List setupRequest(String id, SolrInputDocument doc, String route) { + // if we are in zk mode... + + assert TestInjection.injectUpdateRandomPause(); + + if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { + isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. + forwardToLeader = false; + return null; + } + + ClusterState cstate = zkController.getClusterState(); + DocCollection coll = cstate.getCollection(collection); + Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); + + if (slice == null) { + // No slice found. Most strict routers will have already thrown an exception, so a null return is + // a signal to use the slice of this core. + // TODO: what if this core is not in the targeted collection? + String shardId = cloudDesc.getShardId(); + slice = coll.getSlice(shardId); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); + } + } + + DistribPhase phase = + DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + + if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) { + if (cloudDesc.isLeader()) { + // locally we think we are leader but the request says it came FROMLEADER + // that could indicate a problem, let the full logic below figure it out + } else { + + assert TestInjection.injectFailReplicaRequests(); + + isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. + forwardToLeader = false; + return null; + } + } + + String shardId = slice.getName(); + + try { + // Not equivalent to getLeaderProps, which retries to find a leader. + // Replica leader = slice.getLeader(); + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); + isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); + + if (!isLeader) { + isSubShardLeader = amISubShardLeader(coll, slice, id, doc); + if (isSubShardLeader) { + shardId = cloudDesc.getShardId(); + leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId); + } + } + + doDefensiveChecks(phase); + + // if request is coming from another collection then we want it to be sent to all replicas + // even if its phase is FROMLEADER + String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); + + if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { + // we are coming from the leader, just go local - add no urls + forwardToLeader = false; + return null; + } else if (isLeader || isSubShardLeader) { + // that means I want to forward onto my replicas... + // so get the replicas... + forwardToLeader = false; + ClusterState clusterState = zkController.getZkStateReader().getClusterState(); + String leaderCoreNodeName = leaderReplica.getName(); + List replicas = clusterState.getCollection(collection) + .getSlice(shardId) + .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); + replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); + if (replicas.isEmpty()) { + return null; + } + + // check for test param that lets us miss replicas + String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); + Set skipListSet = null; + if (skipList != null) { + skipListSet = new HashSet<>(skipList.length); + skipListSet.addAll(Arrays.asList(skipList)); + log.info("test.distrib.skip.servers was found and contains:" + skipListSet); + } + + List nodes = new ArrayList<>(replicas.size()); + skippedCoreNodeNames = new HashSet<>(); + ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); + for (Replica replica: replicas) { + String coreNodeName = replica.getName(); + if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { + log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); + } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { + log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); + skippedCoreNodeNames.add(replica.getName()); + } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { + skippedCoreNodeNames.add(replica.getName()); + } else { + nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers)); + } + } + return nodes; + + } else { + // I need to forward on to the leader... + forwardToLeader = true; + return Collections.singletonList( + new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward)); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } + } + + @Override + protected boolean shouldCloneCmdDoc() { + boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; + return willDistrib & cloneRequiredOnLeader; + } + + // helper method, processAdd was getting a bit large. + // Sets replicationTracker = null if we aren't the leader + // We have two possibilities here: + // + // 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker + // 2> we're a follower: allocat a RollupTracker + // + private void checkReplicationTracker(UpdateCommand cmd) { + + SolrParams rp = cmd.getReq().getParams(); + String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM); + // Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the + // course of a batch. + if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) && + rollupReplicationTracker == null) { + rollupReplicationTracker = new RollupRequestReplicationTracker(); + } + // If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in + // a batch we need to use the _same_ leader replication tracker. + if (isLeader && leaderReplicationTracker == null) { + leaderReplicationTracker = new LeaderRequestReplicationTracker( + req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + } + } + + + private List getCollectionUrls(String collection, EnumSet types, boolean onlyLeaders) { + ClusterState clusterState = zkController.getClusterState(); + final DocCollection docCollection = clusterState.getCollectionOrNull(collection); + if (collection == null || docCollection.getSlicesMap() == null) { + throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST, + "Could not find collection in zk: " + clusterState); + } + Map slices = docCollection.getSlicesMap(); + final List urls = new ArrayList<>(slices.size()); + for (Map.Entry sliceEntry : slices.entrySet()) { + Slice replicas = slices.get(sliceEntry.getKey()); + if (onlyLeaders) { + Replica replica = docCollection.getLeader(replicas.getName()); + if (replica != null) { + ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica); + urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName())); + } + continue; + } + Map shardMap = replicas.getReplicasMap(); + + for (Map.Entry entry : shardMap.entrySet()) { + if (!types.contains(entry.getValue().getType())) { + continue; + } + ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); + if (clusterState.liveNodesContain(nodeProps.getNodeName())) { + urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName())); + } + } + } + if (urls.isEmpty()) { + return null; + } + return urls; + } + + /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ + private boolean couldIbeSubShardLeader(DocCollection coll) { + // Could I be the leader of a shard in "construction/recovery" state? + String myShardId = cloudDesc.getShardId(); + Slice mySlice = coll.getSlice(myShardId); + Slice.State state = mySlice.getState(); + return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY; + } + + /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ + protected boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException { + // Am I the leader of a shard in "construction/recovery" state? + String myShardId = cloudDesc.getShardId(); + Slice mySlice = coll.getSlice(myShardId); + final Slice.State state = mySlice.getState(); + if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { + Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); + boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName()); + if (amILeader) { + // Does the document belong to my hash range as well? + DocRouter.Range myRange = mySlice.getRange(); + if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); + if (parentSlice != null) { + boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange()); + return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll); + } else { + // delete by query case -- as long as I am a sub shard leader we're fine + return true; + } + } + } + return false; + } + + protected List getReplicaNodesForLeader(String shardId, Replica leaderReplica) { + ClusterState clusterState = zkController.getZkStateReader().getClusterState(); + String leaderCoreNodeName = leaderReplica.getName(); + List replicas = clusterState.getCollection(collection) + .getSlice(shardId) + .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); + replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); + if (replicas.isEmpty()) { + return null; + } + + // check for test param that lets us miss replicas + String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); + Set skipListSet = null; + if (skipList != null) { + skipListSet = new HashSet<>(skipList.length); + skipListSet.addAll(Arrays.asList(skipList)); + log.info("test.distrib.skip.servers was found and contains:" + skipListSet); + } + + List nodes = new ArrayList<>(replicas.size()); + skippedCoreNodeNames = new HashSet<>(); + ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); + for (Replica replica : replicas) { + String coreNodeName = replica.getName(); + if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { + log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true"); + } else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { + log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); + skippedCoreNodeNames.add(replica.getName()); + } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) + || replica.getState() == Replica.State.DOWN) { + skippedCoreNodeNames.add(replica.getName()); + } else { + nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId)); + } + } + return nodes; + } + + /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */ + protected List getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) { + Collection allSlices = coll.getSlices(); + List nodes = null; + for (Slice aslice : allSlices) { + final Slice.State state = aslice.getState(); + if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) { + DocRouter.Range myRange = coll.getSlice(shardId).getRange(); + if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); + boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange); + if (isSubset && + (docId == null // in case of deletes + || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) { + Replica sliceLeader = aslice.getLeader(); + // slice leader can be null because node/shard is created zk before leader election + if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { + if (nodes == null) nodes = new ArrayList<>(); + ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); + nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName())); + } + } + } + } + return nodes; + } + + /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */ + protected List getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) { + DocRouter router = coll.getRouter(); + List nodes = null; + if (router instanceof CompositeIdRouter) { + CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; + String myShardId = cloudDesc.getShardId(); + Slice slice = coll.getSlice(myShardId); + Map routingRules = slice.getRoutingRules(); + if (routingRules != null) { + + // delete by query case + if (id == null) { + for (Map.Entry entry : routingRules.entrySet()) { + String targetCollectionName = entry.getValue().getTargetCollectionName(); + final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName); + if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) { + final Slice[] activeSlices = docCollection.getActiveSlicesArr(); + Slice any = activeSlices[0]; + if (nodes == null) nodes = new ArrayList<>(); + nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(any.getLeader()))); + } + } + return nodes; + } + + String routeKey = SolrIndexSplitter.getRouteKey(id); + if (routeKey != null) { + RoutingRule rule = routingRules.get(routeKey + "!"); + if (rule != null) { + if (! rule.isExpired()) { + List ranges = rule.getRouteRanges(); + if (ranges != null && !ranges.isEmpty()) { + int hash = compositeIdRouter.sliceHash(id, doc, null, coll); + for (DocRouter.Range range : ranges) { + if (range.includes(hash)) { + DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName()); + Collection activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl); + if (activeSlices == null || activeSlices.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName()); + } + Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName()); + nodes = new ArrayList<>(1); + nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(targetLeader))); + break; + } + } + } + } else { + ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock(); + if (!ruleExpiryLock.isLocked()) { + try { + if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) { + log.info("Going to expire routing rule"); + try { + Map map = Utils.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(), + ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.SHARD_ID_PROP, myShardId, + "routeKey", routeKey + "!"); + zkController.getOverseer().offerStateUpdate(Utils.toJSON(map)); + } catch (KeeperException e) { + log.warn("Exception while removing routing rule for route key: " + routeKey, e); + } catch (Exception e) { + log.error("Exception while removing routing rule for route key: " + routeKey, e); + } finally { + ruleExpiryLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + } + } + } + return nodes; + } + + private void doDefensiveChecks(DistribPhase phase) { + boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; + if (isReplayOrPeersync) return; + + String from = req.getParams().get(DISTRIB_FROM); + ClusterState clusterState = zkController.getClusterState(); + + DocCollection docCollection = clusterState.getCollection(collection); + Slice mySlice = docCollection.getSlice(cloudDesc.getShardId()); + boolean localIsLeader = cloudDesc.isLeader(); + if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay + String fromShard = req.getParams().get(DISTRIB_FROM_PARENT); + if (fromShard != null) { + if (mySlice.getState() == Slice.State.ACTIVE) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "Request says it is coming from parent shard leader but we are in active state"); + } + // shard splitting case -- check ranges to see if we are a sub-shard + Slice fromSlice = docCollection.getSlice(fromShard); + DocRouter.Range parentRange = fromSlice.getRange(); + if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); + if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "Request says it is coming from parent shard leader but parent hash range is not superset of my range"); + } + } else { + String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule? + if (fromCollection == null) { + log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); + SolrException solrExc = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); + solrExc.setMetadata("cause", "LeaderChanged"); + throw solrExc; + } + } + } + + int count = 0; + while (((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) && count < 5) { + count++; + // re-getting localIsLeader since we published to ZK first before setting localIsLeader value + localIsLeader = cloudDesc.isLeader(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) { + log.error("ClusterState says we are the leader, but locally we don't think so"); + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "ClusterState says we are the leader (" + zkController.getBaseUrl() + + "/" + req.getCore().getName() + "), but locally we don't think so. Request came from " + from); + } + } + + @Override + protected void doClose() { + if (cmdDistrib != null) { + cmdDistrib.close(); + } + } + + @Override + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + if (isReadOnly()) { + throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); + } + super.processMergeIndexes(cmd); + } + + @Override + public void processRollback(RollbackUpdateCommand cmd) throws IOException { + if (isReadOnly()) { + throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only."); + } + super.processRollback(cmd); + } + + @Override + public void finish() throws IOException { + assertNotFinished(); + + doFinish(); + } + + // TODO: optionally fail if n replicas are not reached... + private void doFinish() { + boolean shouldUpdateTerms = isLeader && isIndexChanged; + if (shouldUpdateTerms) { + ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()); + if (skippedCoreNodeNames != null) { + zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames); + } + zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero(); + } + // TODO: if not a forward and replication req is not specified, we could + // send in a background thread + + cmdDistrib.finish(); + List errors = cmdDistrib.getErrors(); + // TODO - we may need to tell about more than one error... + + List errorsForClient = new ArrayList<>(errors.size()); + Set replicasShouldBeInLowerTerms = new HashSet<>(); + for (final SolrCmdDistributor.Error error : errors) { + + if (error.req.node instanceof SolrCmdDistributor.ForwardNode) { + // if it's a forward, any fail is a problem - + // otherwise we assume things are fine if we got it locally + // until we start allowing min replication param + errorsForClient.add(error); + continue; + } + + // else... + + // for now we don't error - we assume if it was added locally, we + // succeeded + if (log.isWarnEnabled()) { + log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e); + } + + // Since it is not a forward request, for each fail, try to tell them to + // recover - the doc was already added locally, so it should have been + // legit + + DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM)); + if (phase != DistribPhase.FROMLEADER) + continue; // don't have non-leaders try to recovery other nodes + + // commits are special -- they can run on any node irrespective of whether it is a leader or not + // we don't want to run recovery on a node which missed a commit command + if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) + continue; + + final String replicaUrl = error.req.node.getUrl(); + + // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request + String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null; + if ("LeaderChanged".equals(cause)) { + // let's just fail this request and let the client retry? or just call processAdd again? + log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+ + " now thinks it is the leader! Failing the request to let the client retry! "+error.e); + errorsForClient.add(error); + continue; + } + + String collection = null; + String shardId = null; + + if (error.req.node instanceof SolrCmdDistributor.StdNode) { + SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode)error.req.node; + collection = stdNode.getCollection(); + shardId = stdNode.getShardId(); + + // before we go setting other replicas to down, make sure we're still the leader! + String leaderCoreNodeName = null; + Exception getLeaderExc = null; + Replica leaderProps = null; + try { + leaderProps = zkController.getZkStateReader().getLeader(collection, shardId); + if (leaderProps != null) { + leaderCoreNodeName = leaderProps.getName(); + } + } catch (Exception exc) { + getLeaderExc = exc; + } + if (leaderCoreNodeName == null) { + log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " + + "before putting {} into leader-initiated recovery", + cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc); + } + + List myReplicas = zkController.getZkStateReader().getReplicaProps(collection, + cloudDesc.getShardId(), cloudDesc.getCoreNodeName()); + boolean foundErrorNodeInReplicaList = false; + if (myReplicas != null) { + for (ZkCoreNodeProps replicaProp : myReplicas) { + if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) { + foundErrorNodeInReplicaList = true; + break; + } + } + } + + if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader + && foundErrorNodeInReplicaList // we found an error for one of replicas + && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR + try { + String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName(); + // if false, then the node is probably not "live" anymore + // and we do not need to send a recovery message + Throwable rootCause = SolrException.getRootCause(error.e); + log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause); + replicasShouldBeInLowerTerms.add(coreNodeName); + } catch (Exception exc) { + Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc); + log.error("Leader failed to set replica " + + error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause); + } + } else { + // not the leader anymore maybe or the error'd node is not my replica? + if (!foundErrorNodeInReplicaList) { + log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+ + cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + + "No request recovery command will be sent!"); + if (!shardId.equals(cloudDesc.getShardId())) { + // some replicas on other shard did not receive the updates (ex: during splitshard), + // exception must be notified to clients + errorsForClient.add(error); + } + } else { + log.warn("Core " + cloudDesc.getCoreNodeName() + " is no longer the leader for " + collection + " " + + shardId + " or we tried to put ourself into LIR, no request recovery command will be sent!"); + } + } + } + } + if (!replicasShouldBeInLowerTerms.isEmpty()) { + zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()) + .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms); + } + handleReplicationFactor(); + if (0 < errorsForClient.size()) { + throw new DistributedUpdatesAsyncException(errorsForClient); + } + } + + /** + * If necessary, include in the response the achieved replication factor + */ + @SuppressWarnings("deprecation") + private void handleReplicationFactor() { + if (leaderReplicationTracker != null || rollupReplicationTracker != null) { + int achievedRf = Integer.MAX_VALUE; + + if (leaderReplicationTracker != null) { + + achievedRf = leaderReplicationTracker.getAchievedRf(); + + // Transfer this to the rollup tracker if it exists + if (rollupReplicationTracker != null) { + rollupReplicationTracker.testAndSetAchievedRf(achievedRf); + } + } + + // Rollup tracker has accumulated stats. + if (rollupReplicationTracker != null) { + achievedRf = rollupReplicationTracker.getAchievedRf(); + } + if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) { + // Unused, but kept for back compatibility. To be removed in Solr 9 + rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT))); + } + rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf); + rollupReplicationTracker = null; + leaderReplicationTracker = null; + + } + } + + private void zkCheck() { + + // Streaming updates can delay shutdown and cause big update reorderings (new streams can't be + // initiated, but existing streams carry on). This is why we check if the CC is shutdown. + // See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test. + if (req.getCore().getCoreContainer().isShutDown()) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down."); + } + + if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { + // for log reply or peer sync, we don't need to be connected to ZK + return; + } + + if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) { + return; + } + + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled."); + } +} diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java index ef9f8de5e60..125724b9e35 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java @@ -368,7 +368,8 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor return true; } // if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check. - return !distribProc.isLeader(cmd); + distribProc.setupRequest(cmd); + return !distribProc.isLeader(); } @Override diff --git a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java index ca31897afe4..7fc33d7abad 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java @@ -216,7 +216,8 @@ public class SkipExistingDocumentsProcessorFactory extends UpdateRequestProcesso if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) { return false; } - return distribProc.isLeader(cmd); + distribProc.setupRequest(cmd); + return distribProc.isLeader(); } @Override diff --git a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java index 7d377fa9e09..2a2084693ef 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java @@ -232,7 +232,7 @@ public class AtomicUpdateProcessorFactoryTest extends SolrTestCaseJ4 { try { factory.getInstance(cmd.getReq(), new SolrQueryResponse(), - new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(), + createDistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(), new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd); } catch (IOException e) { } diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java index 8f56d689fe5..a4c54d10783 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java @@ -34,11 +34,10 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 { } @Test - public void testShouldBufferUpdate() { + public void testShouldBufferUpdateZk() { SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams()); DistributedUpdateProcessor processor = new DistributedUpdateProcessor( req, null, null, null); - AddUpdateCommand cmd = new AddUpdateCommand(req); // applying buffer updates, isReplayOrPeerSync flag doesn't matter assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); @@ -50,5 +49,4 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 { assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); } - } diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index 18b6ec17ea1..7e1ee0ae2c7 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -131,6 +131,9 @@ import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.servlet.DirectSolrConnection; +import org.apache.solr.update.processor.DistributedUpdateProcessor; +import org.apache.solr.update.processor.DistributedZkUpdateProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.util.LogLevel; import org.apache.solr.util.RandomizeSSL; import org.apache.solr.util.RandomizeSSL.SSLRandomizer; @@ -2891,6 +2894,14 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase { } } + + public static DistributedUpdateProcessor createDistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, + UpdateRequestProcessor next) { + if(h.getCoreContainer().isZooKeeperAware()) { + return new DistributedZkUpdateProcessor(req, rsp, next); + } + return new DistributedUpdateProcessor(req, rsp, next); + } /** * Cleans up the randomized sysproperties and variables set by {@link #randomizeNumericTypesProperties}