solrcloud: send deleteByQuery to all shard leaders to version and forward to replicas

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1243768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-02-14 02:45:41 +00:00
parent dd13217dee
commit 27f5a8b290
1 changed files with 201 additions and 99 deletions

View File

@ -60,14 +60,15 @@ import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionBucket; import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo; import org.apache.solr.update.VersionInfo;
import org.apache.zookeeper.KeeperException;
// NOT mt-safe... create a new processor for each add thread // NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor { public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String SEEN_LEADER = "leader"; public static final String SEEN_LEADER = "leader";
public static final String COMMIT_END_POINT = "commit_end_point"; public static final String COMMIT_END_POINT = "commit_end_point";
public static final String DELQUERY_END_POINT = "delquery_end_point"; public static final String DELETE_BY_QUERY_LEVEL = "dbq_level";
private final SolrQueryRequest req; private final SolrQueryRequest req;
private final SolrQueryResponse rsp; private final SolrQueryResponse rsp;
private final UpdateRequestProcessor next; private final UpdateRequestProcessor next;
@ -91,6 +92,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private boolean zkEnabled = false; private boolean zkEnabled = false;
private CloudDescriptor cloudDesc;
private String collection; private String collection;
private ZkController zkController; private ZkController zkController;
@ -128,7 +130,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor(); cloudDesc = coreDesc.getCloudDescriptor();
if (cloudDesc != null) { if (cloudDesc != null) {
collection = cloudDesc.getCollectionName(); collection = cloudDesc.getCollectionName();
@ -192,7 +194,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return nodes; return nodes;
} }
private String getShard(int hash, String collection, CloudState cloudState) { private String getShard(int hash, String collection, CloudState cloudState) {
// ranges should be part of the cloud state and eventually gotten from zk // ranges should be part of the cloud state and eventually gotten from zk
@ -200,6 +203,43 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return cloudState.getShard(hash, collection); return cloudState.getShard(hash, collection);
} }
// used for deleteByQyery to get the list of nodes this leader should forward to
private List<Node> setupRequest() {
List<Node> nodes = null;
String shardId = cloudDesc.getShardId();
try {
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
collection, shardId));
String leaderNodeName = leaderProps.getCoreNodeName();
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
isLeader = coreNodeName.equals(leaderNodeName);
// TODO: what if we are no longer the leader?
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, zkController.getNodeName(),
coreName);
if (replicaProps != null) {
nodes = new ArrayList<Node>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) {
nodes.add(new StdNode(props));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
return nodes;
}
@Override @Override
public void processAdd(AddUpdateCommand cmd) throws IOException { public void processAdd(AddUpdateCommand cmd) throws IOException {
// TODO: check for id field? // TODO: check for id field?
@ -419,17 +459,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override @Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException { public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (!cmd.isDeleteById()) { if (!cmd.isDeleteById()) {
// delete by query... doDeleteByQuery(cmd);
// TODO: handle versioned and distributed deleteByQuery
// even in non zk mode, tests simulate updates from a leader
if(!zkEnabled) {
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
} else {
zkCheck();
}
processDeleteByQuery(cmd);
return; return;
} }
@ -475,6 +505,161 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} }
} }
public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// even in non zk mode, tests simulate updates from a leader
if(!zkEnabled) {
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
} else {
zkCheck();
}
// Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard
// Lev2: we are a leader receiving a forwarded deleteByQuery... we must:
// - block all updates (use VersionInfo)
// - flush *all* updates going to our replicas
// - forward the DBQ to our replicas and wait for the response
// - log + execute the local DBQ
// Lev3: we are a replica receiving a DBQ from our leader
// - log + execute the local DBQ
int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1);
if (zkEnabled && dbqlevel == 1) {
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
Map<String,Slice> slices = zkController.getCloudState().getSlices(collection);
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set("dbqlevel", 2);
List<Node> leaders = new ArrayList<Node>(slices.size());
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
String sliceName = sliceEntry.getKey();
ZkNodeProps leaderProps;
try {
leaderProps = zkController.getZkStateReader().getLeaderProps(collection, sliceName);
} catch (InterruptedException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + sliceName, e);
}
// TODO: What if leaders changed in the meantime?
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
// Am I the leader for this slice?
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leaderProps);
String leaderNodeName = coreLeaderProps.getCoreNodeName();
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
isLeader = coreNodeName.equals(leaderNodeName);
if (isLeader) {
// don't forward to ourself
leaderForAnyShard = true;
} else {
leaders.add(new StdNode(coreLeaderProps));
}
}
cmdDistrib.distribDelete(cmd, leaders, params);
if (!leaderForAnyShard) {
return;
}
// change the level to 2 so we look up and forward to our own replicas (if any)
dbqlevel = 2;
}
List<Node> replicas = null;
if (zkEnabled && dbqlevel == 2) {
// This core should be a leader
replicas = setupRequest();
}
if (vinfo == null) {
super.processDelete(cmd);
return;
}
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
// Find the version
long versionOnUpdate = cmd.getVersion();
if (versionOnUpdate == 0) {
String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
boolean leaderLogic = isLeader && !isReplay;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
}
vinfo.blockUpdates();
try {
if (versionsStored) {
if (leaderLogic) {
long version = vinfo.getNewClock();
cmd.setVersion(-version);
// TODO update versions in all buckets
// TODO: flush any adds to these replicas so they do not get reordered w.r.t. this DBQ
doLocalDelete(cmd);
// forward to all replicas
if (replicas != null) {
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set("dbqlevel", 3);
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(SEEN_LEADER, "true");
cmdDistrib.distribDelete(cmd, replicas, params);
// wait for DBQ responses before releasing the update block to eliminate the possibility
// of an add being reordered.
// TODO: this isn't strictly necessary - we could do the same thing we do for PeerSync
// in DUH2 and add a clause that prevents deleting older docs.
cmdDistrib.finish();
}
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.deleteByQuery(cmd);
return;
}
doLocalDelete(cmd);
}
}
// since we don't know which documents were deleted, the easiest thing to do is to invalidate
// all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
// (so cache misses will see up-to-date data)
} finally {
vinfo.unblockUpdates();
}
if (returnVersions && rsp != null) {
if (deleteByQueryResponse == null) {
deleteByQueryResponse = new NamedList<String>();
rsp.add("deleteByQuery",deleteByQueryResponse);
}
deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
}
}
private void zkCheck() { private void zkCheck() {
int retries = 10; int retries = 10;
while (!zkController.isConnected()) { while (!zkController.isConnected()) {
@ -571,89 +756,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} }
} }
private void processDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
if (vinfo == null) {
super.processDelete(cmd);
return;
}
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
// Find the version
long versionOnUpdate = cmd.getVersion();
if (versionOnUpdate == 0) {
String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
boolean leaderLogic = isLeader && !isReplay;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
}
vinfo.blockUpdates();
try {
if (versionsStored) {
if (leaderLogic) {
long version = vinfo.getNewClock();
cmd.setVersion(-version);
// TODO update versions in all buckets
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.deleteByQuery(cmd);
return;
}
}
}
doLocalDelete(cmd);
// since we don't know which documents were deleted, the easiest thing to do is to invalidate
// all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
// (so cache misses will see up-to-date data)
} finally {
vinfo.unblockUpdates();
}
// TODO: we should consider this? Send delete query to everyone in the current collection
if (zkEnabled) {
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
if (!params.getBool(DELQUERY_END_POINT, false)) {
params.set(DELQUERY_END_POINT, true);
String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName();
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName(), shardZkNodeName);
if (nodes != null) {
cmdDistrib.distribDelete(cmd, nodes, params);
finish();
}
}
}
if (returnVersions && rsp != null) {
if (deleteByQueryResponse == null) {
deleteByQueryResponse = new NamedList<String>();
rsp.add("deleteByQuery",deleteByQueryResponse);
}
deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
}
}
@Override @Override
public void processCommit(CommitUpdateCommand cmd) throws IOException { public void processCommit(CommitUpdateCommand cmd) throws IOException {
@ -718,7 +820,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} }
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) { for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
Slice replicas = slices.get(sliceEntry.getKey()); Slice replicas = slices.get(sliceEntry.getKey());
Map<String,ZkNodeProps> shardMap = replicas.getShards(); Map<String,ZkNodeProps> shardMap = replicas.getShards();
for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) { for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {