SOLR-4257: PeerSync updates and Log Replay updates should not wait for ZK connection

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1428677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2013-01-04 00:01:16 +00:00
parent dc87731a3f
commit c7d8852051
3 changed files with 34 additions and 9 deletions

View File

@ -435,6 +435,9 @@ Bug Fixes
* SOLR-4253: Misleading resource loading warning from Carrot2 clustering
component fixed (Stanisław Osiński)
* SOLR-4257: PeerSync updates and Log Replay updates should not wait for
a ZooKeeper connection in order to proceed. (yonik)
Other Changes
----------------------

View File

@ -443,10 +443,11 @@ public class PeerSync {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
// params.set("peersync",true); // debugging
params.set("peersync",true); // debugging
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
SolrQueryResponse rsp = new SolrQueryResponse();
// TODO: use the standard update processor chain now that it has support to skip processors before the DistributedUpdateProcessor?
RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
runFac.init(new NamedList());

View File

@ -140,6 +140,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private int numNodes;
private UpdateCommand updateCommand; // the current command this processor is working on.
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@ -184,6 +186,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// if we are in zk mode...
if (zkEnabled) {
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
}
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
@ -272,10 +280,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private void doDefensiveChecks(String shardId, DistribPhase phase) {
boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
if (isReplayOrPeersync) return;
String from = req.getParams().get("distrib.from");
boolean logReplay = req.getParams().getBool(LOG_REPLAY, false);
boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
if (!logReplay && DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
}
@ -326,6 +336,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
updateCommand = cmd;
if (zkEnabled) {
zkCheck();
nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
@ -493,8 +505,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
boolean leaderLogic = isLeader && !isReplay;
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
VersionBucket bucket = vinfo.bucket(bucketHash);
@ -690,6 +702,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
updateCommand = cmd;
if (!cmd.isDeleteById()) {
doDeleteByQuery(cmd);
return;
@ -848,8 +862,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
boolean leaderLogic = isLeader && !isReplay;
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@ -912,6 +926,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private void zkCheck() {
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
// for log reply or peer sync, we don't need to be connected to ZK
return;
}
if (zkController.isConnected()) {
return;
}
@ -955,8 +974,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
long signedVersionOnUpdate = versionOnUpdate;
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
boolean leaderLogic = isLeader && !isReplay;
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@ -1026,6 +1045,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
updateCommand = cmd;
if (zkEnabled) {
zkCheck();
}