From 27f5a8b290e92255caf74d7056973d616a3faed4 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Tue, 14 Feb 2012 02:45:41 +0000 Subject: [PATCH] solrcloud: send deleteByQuery to all shard leaders to version and forward to replicas git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1243768 13f79535-47bb-0310-9956-ffa450edef68 --- .../processor/DistributedUpdateProcessor.java | 300 ++++++++++++------ 1 file changed, 201 insertions(+), 99 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 58e0860b4f2..a3c01fae675 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -60,14 +60,15 @@ import org.apache.solr.update.UpdateHandler; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.VersionBucket; import org.apache.solr.update.VersionInfo; +import org.apache.zookeeper.KeeperException; // NOT mt-safe... create a new processor for each add thread // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for public class DistributedUpdateProcessor extends UpdateRequestProcessor { public static final String SEEN_LEADER = "leader"; public static final String COMMIT_END_POINT = "commit_end_point"; - public static final String DELQUERY_END_POINT = "delquery_end_point"; - + public static final String DELETE_BY_QUERY_LEVEL = "dbq_level"; + private final SolrQueryRequest req; private final SolrQueryResponse rsp; private final UpdateRequestProcessor next; @@ -91,6 +92,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private boolean zkEnabled = false; + private CloudDescriptor cloudDesc; private String collection; private ZkController zkController; @@ -128,7 +130,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); - CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor(); + cloudDesc = coreDesc.getCloudDescriptor(); if (cloudDesc != null) { collection = cloudDesc.getCollectionName(); @@ -192,7 +194,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return nodes; } - + + private String getShard(int hash, String collection, CloudState cloudState) { // ranges should be part of the cloud state and eventually gotten from zk @@ -200,6 +203,43 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return cloudState.getShard(hash, collection); } + // used for deleteByQyery to get the list of nodes this leader should forward to + private List setupRequest() { + List nodes = null; + String shardId = cloudDesc.getShardId(); + + try { + + ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps( + collection, shardId)); + + String leaderNodeName = leaderProps.getCoreNodeName(); + String coreName = req.getCore().getName(); + String coreNodeName = zkController.getNodeName() + "_" + coreName; + isLeader = coreNodeName.equals(leaderNodeName); + + // TODO: what if we are no longer the leader? + + forwardToLeader = false; + List replicaProps = zkController.getZkStateReader() + .getReplicaProps(collection, shardId, zkController.getNodeName(), + coreName); + if (replicaProps != null) { + nodes = new ArrayList(replicaProps.size()); + for (ZkCoreNodeProps props : replicaProps) { + nodes.add(new StdNode(props)); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } + + return nodes; + } + + @Override public void processAdd(AddUpdateCommand cmd) throws IOException { // TODO: check for id field? @@ -419,17 +459,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { @Override public void processDelete(DeleteUpdateCommand cmd) throws IOException { if (!cmd.isDeleteById()) { - // delete by query... - // TODO: handle versioned and distributed deleteByQuery - - // even in non zk mode, tests simulate updates from a leader - if(!zkEnabled) { - isLeader = !req.getParams().getBool(SEEN_LEADER, false); - } else { - zkCheck(); - } - - processDeleteByQuery(cmd); + doDeleteByQuery(cmd); return; } @@ -475,6 +505,161 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { + // even in non zk mode, tests simulate updates from a leader + if(!zkEnabled) { + isLeader = !req.getParams().getBool(SEEN_LEADER, false); + } else { + zkCheck(); + } + + // Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard + // Lev2: we are a leader receiving a forwarded deleteByQuery... we must: + // - block all updates (use VersionInfo) + // - flush *all* updates going to our replicas + // - forward the DBQ to our replicas and wait for the response + // - log + execute the local DBQ + // Lev3: we are a replica receiving a DBQ from our leader + // - log + execute the local DBQ + + int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1); + + if (zkEnabled && dbqlevel == 1) { + boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard + + Map slices = zkController.getCloudState().getSlices(collection); + + ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); + params.set("dbqlevel", 2); + + List leaders = new ArrayList(slices.size()); + for (Map.Entry sliceEntry : slices.entrySet()) { + String sliceName = sliceEntry.getKey(); + ZkNodeProps leaderProps; + try { + leaderProps = zkController.getZkStateReader().getLeaderProps(collection, sliceName); + } catch (InterruptedException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + sliceName, e); + } + + // TODO: What if leaders changed in the meantime? + // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader? + + // Am I the leader for this slice? + ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leaderProps); + String leaderNodeName = coreLeaderProps.getCoreNodeName(); + String coreName = req.getCore().getName(); + String coreNodeName = zkController.getNodeName() + "_" + coreName; + isLeader = coreNodeName.equals(leaderNodeName); + + if (isLeader) { + // don't forward to ourself + leaderForAnyShard = true; + } else { + leaders.add(new StdNode(coreLeaderProps)); + } + } + + cmdDistrib.distribDelete(cmd, leaders, params); + + if (!leaderForAnyShard) { + return; + } + + // change the level to 2 so we look up and forward to our own replicas (if any) + dbqlevel = 2; + } + + List replicas = null; + + if (zkEnabled && dbqlevel == 2) { + // This core should be a leader + replicas = setupRequest(); + } + + if (vinfo == null) { + super.processDelete(cmd); + return; + } + + // at this point, there is an update we need to try and apply. + // we may or may not be the leader. + + // Find the version + long versionOnUpdate = cmd.getVersion(); + if (versionOnUpdate == 0) { + String versionOnUpdateS = req.getParams().get(VERSION_FIELD); + versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); + } + versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version + + boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0; + boolean leaderLogic = isLeader && !isReplay; + + if (!leaderLogic && versionOnUpdate==0) { + throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader"); + } + + vinfo.blockUpdates(); + try { + + if (versionsStored) { + if (leaderLogic) { + long version = vinfo.getNewClock(); + cmd.setVersion(-version); + // TODO update versions in all buckets + + // TODO: flush any adds to these replicas so they do not get reordered w.r.t. this DBQ + + doLocalDelete(cmd); + + // forward to all replicas + if (replicas != null) { + ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); + params.set("dbqlevel", 3); + params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); + params.set(SEEN_LEADER, "true"); + cmdDistrib.distribDelete(cmd, replicas, params); + + // wait for DBQ responses before releasing the update block to eliminate the possibility + // of an add being reordered. + // TODO: this isn't strictly necessary - we could do the same thing we do for PeerSync + // in DUH2 and add a clause that prevents deleting older docs. + cmdDistrib.finish(); + } + + } else { + cmd.setVersion(-versionOnUpdate); + + if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.deleteByQuery(cmd); + return; + } + + doLocalDelete(cmd); + } + } + + // since we don't know which documents were deleted, the easiest thing to do is to invalidate + // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader + // (so cache misses will see up-to-date data) + + } finally { + vinfo.unblockUpdates(); + } + + if (returnVersions && rsp != null) { + if (deleteByQueryResponse == null) { + deleteByQueryResponse = new NamedList(); + rsp.add("deleteByQuery",deleteByQueryResponse); + } + deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion()); + } + } + + private void zkCheck() { int retries = 10; while (!zkController.isConnected()) { @@ -571,89 +756,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - private void processDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { - if (vinfo == null) { - super.processDelete(cmd); - return; - } - - // at this point, there is an update we need to try and apply. - // we may or may not be the leader. - - // Find the version - long versionOnUpdate = cmd.getVersion(); - if (versionOnUpdate == 0) { - String versionOnUpdateS = req.getParams().get(VERSION_FIELD); - versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); - } - versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version - - boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0; - boolean leaderLogic = isLeader && !isReplay; - - if (!leaderLogic && versionOnUpdate==0) { - throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader"); - } - - vinfo.blockUpdates(); - try { - - if (versionsStored) { - if (leaderLogic) { - long version = vinfo.getNewClock(); - cmd.setVersion(-version); - // TODO update versions in all buckets - } else { - cmd.setVersion(-versionOnUpdate); - - if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.deleteByQuery(cmd); - return; - } - } - } - - doLocalDelete(cmd); - - // since we don't know which documents were deleted, the easiest thing to do is to invalidate - // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader - // (so cache misses will see up-to-date data) - - } finally { - vinfo.unblockUpdates(); - } - - // TODO: we should consider this? Send delete query to everyone in the current collection - - if (zkEnabled) { - ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); - if (!params.getBool(DELQUERY_END_POINT, false)) { - params.set(DELQUERY_END_POINT, true); - - String nodeName = req.getCore().getCoreDescriptor().getCoreContainer() - .getZkController().getNodeName(); - String shardZkNodeName = nodeName + "_" + req.getCore().getName(); - List nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor() - .getCloudDescriptor().getCollectionName(), shardZkNodeName); - - if (nodes != null) { - cmdDistrib.distribDelete(cmd, nodes, params); - finish(); - } - } - } - - if (returnVersions && rsp != null) { - if (deleteByQueryResponse == null) { - deleteByQueryResponse = new NamedList(); - rsp.add("deleteByQuery",deleteByQueryResponse); - } - deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion()); - } - - } @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { @@ -718,7 +820,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } for (Map.Entry sliceEntry : slices.entrySet()) { Slice replicas = slices.get(sliceEntry.getKey()); - + Map shardMap = replicas.getShards(); for (Entry entry : shardMap.entrySet()) {