mirror of https://github.com/apache/lucene.git
SOLR-5552: Leader recovery process can select the wrong leader if all replicas for a shard are down and trying to recover as well as lose updates that should have been recovered.
SOLR-5569 A replica should not try and recover from a leader until it has published that it is ACTIVE. SOLR-5568 A SolrCore cannot decide to be the leader just because the cluster state says no other SolrCore's are active. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1553031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ac5bdbc47
commit
e7d67c783b
|
@ -227,20 +227,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// if !success but no one else is in active mode,
|
||||
// we are the leader anyway
|
||||
// TODO: should we also be leader if there is only one other active?
|
||||
// if we couldn't sync with it, it shouldn't be able to sync with us
|
||||
// TODO: this needs to be moved to the election context - the logic does
|
||||
// not belong here.
|
||||
if (!success
|
||||
&& !areAnyOtherReplicasActive(zkController, leaderProps, collection,
|
||||
shardId)) {
|
||||
log.info("Sync was not a success but no one else is active! I am the leader");
|
||||
success = true;
|
||||
}
|
||||
|
||||
// solrcloud_debug
|
||||
if (log.isDebugEnabled()) {
|
||||
try {
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClosableThread;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -205,7 +206,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName)
|
||||
private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
|
||||
throws SolrServerException, IOException {
|
||||
HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
|
||||
try {
|
||||
|
@ -217,7 +218,9 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
|||
prepCmd.setState(ZkStateReader.RECOVERING);
|
||||
prepCmd.setCheckLive(true);
|
||||
prepCmd.setOnlyIfLeader(true);
|
||||
|
||||
if (!Slice.CONSTRUCTION.equals(slice.getState())) {
|
||||
prepCmd.setOnlyIfLeaderActive(true);
|
||||
}
|
||||
server.request(prepCmd);
|
||||
} finally {
|
||||
server.shutdown();
|
||||
|
@ -364,7 +367,8 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
|||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
|
||||
|
||||
|
||||
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
|
||||
Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
||||
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
|
||||
|
||||
// we wait a bit so that any updates on the leader
|
||||
// that started before they saw recovering state
|
||||
|
|
|
@ -219,10 +219,6 @@ public final class ZkController {
|
|||
|
||||
// seems we dont need to do this again...
|
||||
// Overseer.createClientNodes(zkClient, getNodeName());
|
||||
ShardHandler shardHandler;
|
||||
String adminPath;
|
||||
shardHandler = cc.getShardHandlerFactory().getShardHandler();
|
||||
adminPath = cc.getAdminPath();
|
||||
|
||||
cc.cancelCoreRecoveries();
|
||||
|
||||
|
@ -739,6 +735,8 @@ public final class ZkController {
|
|||
* @return the shardId for the SolrCore
|
||||
*/
|
||||
public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {
|
||||
// pre register has published our down state
|
||||
|
||||
final String baseUrl = getBaseUrl();
|
||||
|
||||
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
|
||||
|
@ -796,9 +794,6 @@ public final class ZkController {
|
|||
// TODO: should this be moved to another thread? To recoveryStrat?
|
||||
// TODO: should this actually be done earlier, before (or as part of)
|
||||
// leader election perhaps?
|
||||
// TODO: if I'm the leader, ensure that a replica that is trying to recover waits until I'm
|
||||
// active (or don't make me the
|
||||
// leader until my local replay is done.
|
||||
|
||||
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||
if (!core.isReloaded() && ulog != null) {
|
||||
|
|
|
@ -256,7 +256,7 @@ public class CoreContainer {
|
|||
preRegisterInZk(cd);
|
||||
}
|
||||
c = create(cd);
|
||||
registerCore(cd.isTransient(), name, c, false);
|
||||
registerCore(cd.isTransient(), name, c, false, false);
|
||||
} catch (Throwable t) {
|
||||
/* if (isZooKeeperAware()) {
|
||||
try {
|
||||
|
@ -316,6 +316,20 @@ public class CoreContainer {
|
|||
ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
|
||||
}
|
||||
}
|
||||
|
||||
if (isZooKeeperAware()) {
|
||||
// register in zk in background threads
|
||||
Collection<SolrCore> cores = getCores();
|
||||
if (cores != null) {
|
||||
for (SolrCore core : cores) {
|
||||
try {
|
||||
zkSys.registerInZk(core, true);
|
||||
} catch (Throwable t) {
|
||||
SolrException.log(log, "Error registering SolrCore", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkForDuplicateCoreNames(List<CoreDescriptor> cds) {
|
||||
|
@ -434,6 +448,10 @@ public class CoreContainer {
|
|||
}
|
||||
|
||||
protected SolrCore registerCore(boolean isTransientCore, String name, SolrCore core, boolean returnPrevNotClosed) {
|
||||
return registerCore(isTransientCore, name, core, returnPrevNotClosed, true);
|
||||
}
|
||||
|
||||
protected SolrCore registerCore(boolean isTransientCore, String name, SolrCore core, boolean returnPrevNotClosed, boolean registerInZk) {
|
||||
if( core == null ) {
|
||||
throw new RuntimeException( "Can not register a null core." );
|
||||
}
|
||||
|
@ -476,7 +494,9 @@ public class CoreContainer {
|
|||
|
||||
if( old == null || old == core) {
|
||||
log.info( "registering core: "+name );
|
||||
zkSys.registerInZk(core);
|
||||
if (registerInZk) {
|
||||
zkSys.registerInZk(core, false);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
|
@ -484,7 +504,9 @@ public class CoreContainer {
|
|||
if (!returnPrevNotClosed) {
|
||||
old.close();
|
||||
}
|
||||
zkSys.registerInZk(core);
|
||||
if (registerInZk) {
|
||||
zkSys.registerInZk(core, false);
|
||||
}
|
||||
return old;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.solr.cloud.ZkSolrResourceLoader;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.schema.IndexSchema;
|
||||
import org.apache.solr.schema.IndexSchemaFactory;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.SystemIdResolver;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -38,6 +40,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class ZkContainer {
|
||||
|
@ -45,12 +49,9 @@ public class ZkContainer {
|
|||
|
||||
protected ZkController zkController;
|
||||
private SolrZkServer zkServer;
|
||||
private int zkClientTimeout;
|
||||
private String hostPort;
|
||||
private String hostContext;
|
||||
private String host;
|
||||
private int leaderVoteWait;
|
||||
private Boolean genericCoreNodeNames;
|
||||
|
||||
private ExecutorService coreZkRegister = Executors.newFixedThreadPool(Integer.MAX_VALUE,
|
||||
new DefaultSolrThreadFactory("coreZkRegister") );
|
||||
|
||||
public ZkContainer() {
|
||||
|
||||
|
@ -82,13 +83,6 @@ public class ZkContainer {
|
|||
|
||||
String zkRun = System.getProperty("zkRun");
|
||||
|
||||
this.zkClientTimeout = zkClientTimeout;
|
||||
this.hostPort = hostPort;
|
||||
this.hostContext = hostContext;
|
||||
this.host = host;
|
||||
this.leaderVoteWait = leaderVoteWait;
|
||||
this.genericCoreNodeNames = genericCoreNodeNames;
|
||||
|
||||
if (zkRun == null && zookeeperHost == null)
|
||||
return; // not in zk mode
|
||||
|
||||
|
@ -239,34 +233,36 @@ public class ZkContainer {
|
|||
}
|
||||
}
|
||||
|
||||
public void registerInZk(SolrCore core) {
|
||||
if (zkController != null) {
|
||||
public void registerInZk(final SolrCore core, boolean background) {
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
zkController.register(core.getName(), core.getCoreDescriptor());
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
SolrException.log(log, "", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||
e);
|
||||
} catch (Exception e) {
|
||||
// if register fails, this is really bad - close the zkController to
|
||||
// minimize any damage we can cause
|
||||
try {
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
|
||||
} catch (KeeperException e1) {
|
||||
log.error("", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"", e);
|
||||
} catch (InterruptedException e1) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"", e);
|
||||
log.error("", e1);
|
||||
} catch (Exception e1) {
|
||||
log.error("", e1);
|
||||
}
|
||||
SolrException.log(log, "", e);
|
||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
if (zkController != null) {
|
||||
if (background) {
|
||||
coreZkRegister.execute(thread);
|
||||
} else {
|
||||
thread.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -309,12 +305,20 @@ public class ZkContainer {
|
|||
}
|
||||
|
||||
public void close() {
|
||||
|
||||
try {
|
||||
if (zkController != null) {
|
||||
zkController.close();
|
||||
}
|
||||
|
||||
} finally {
|
||||
try {
|
||||
if (zkServer != null) {
|
||||
zkServer.stop();
|
||||
}
|
||||
} finally {
|
||||
ExecutorUtil.shutdownNowAndAwaitTermination(coreZkRegister);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -864,6 +864,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
String waitForState = params.get("state");
|
||||
Boolean checkLive = params.getBool("checkLive");
|
||||
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
|
||||
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
|
||||
|
||||
log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
|
||||
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader);
|
||||
|
@ -906,6 +907,11 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
if (nodeProps != null) {
|
||||
state = nodeProps.getStr(ZkStateReader.STATE_PROP);
|
||||
live = clusterState.liveNodesContain(nodeName);
|
||||
|
||||
String localState = cloudDescriptor.getLastPublished();
|
||||
if (onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE))) {
|
||||
continue;
|
||||
}
|
||||
if (nodeProps != null && state.equals(waitForState)) {
|
||||
if (checkLive == null) {
|
||||
break;
|
||||
|
|
|
@ -156,7 +156,7 @@ public class CoreAdminRequest extends SolrRequest
|
|||
protected String state;
|
||||
protected Boolean checkLive;
|
||||
protected Boolean onlyIfLeader;
|
||||
|
||||
protected Boolean onlyIfLeaderActive;
|
||||
|
||||
public WaitForState() {
|
||||
action = CoreAdminAction.PREPRECOVERY;
|
||||
|
@ -202,6 +202,10 @@ public class CoreAdminRequest extends SolrRequest
|
|||
this.onlyIfLeader = onlyIfLeader;
|
||||
}
|
||||
|
||||
public void setOnlyIfLeaderActive(boolean onlyIfLeaderActive) {
|
||||
this.onlyIfLeaderActive = onlyIfLeaderActive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
if( action == null ) {
|
||||
|
@ -232,6 +236,10 @@ public class CoreAdminRequest extends SolrRequest
|
|||
params.set( "onlyIfLeader", onlyIfLeader);
|
||||
}
|
||||
|
||||
if (onlyIfLeaderActive != null) {
|
||||
params.set( "onlyIfLeaderActive", onlyIfLeaderActive);
|
||||
}
|
||||
|
||||
return params;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue