diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 314bd1094f6..faf3e5d8cc5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -226,20 +226,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { success = true; } } - - - // if !success but no one else is in active mode, - // we are the leader anyway - // TODO: should we also be leader if there is only one other active? - // if we couldn't sync with it, it shouldn't be able to sync with us - // TODO: this needs to be moved to the election context - the logic does - // not belong here. - if (!success - && !areAnyOtherReplicasActive(zkController, leaderProps, collection, - shardId)) { - log.info("Sync was not a success but no one else is active! I am the leader"); - success = true; - } // solrcloud_debug if (log.isDebugEnabled()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 7955bd6756f..8b8f36dea8e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClosableThread; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -205,7 +206,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { } } - private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName) + private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice) throws SolrServerException, IOException { HttpSolrServer server = new HttpSolrServer(leaderBaseUrl); try { @@ -217,7 +218,9 @@ public class RecoveryStrategy extends Thread implements ClosableThread { prepCmd.setState(ZkStateReader.RECOVERING); prepCmd.setCheckLive(true); prepCmd.setOnlyIfLeader(true); - + if (!Slice.CONSTRUCTION.equals(slice.getState())) { + prepCmd.setOnlyIfLeaderActive(true); + } server.request(prepCmd); } finally { server.shutdown(); @@ -364,7 +367,8 @@ public class RecoveryStrategy extends Thread implements ClosableThread { zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING); - sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName); + Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId()); + sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice); // we wait a bit so that any updates on the leader // that started before they saw recovering state diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index d291eb9ba55..e28c321b77e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -219,10 +219,6 @@ public final class ZkController { // seems we dont need to do this again... // Overseer.createClientNodes(zkClient, getNodeName()); - ShardHandler shardHandler; - String adminPath; - shardHandler = cc.getShardHandlerFactory().getShardHandler(); - adminPath = cc.getAdminPath(); cc.cancelCoreRecoveries(); @@ -739,6 +735,8 @@ public final class ZkController { * @return the shardId for the SolrCore */ public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception { + // pre register has published our down state + final String baseUrl = getBaseUrl(); final CloudDescriptor cloudDesc = desc.getCloudDescriptor(); @@ -796,9 +794,6 @@ public final class ZkController { // TODO: should this be moved to another thread? To recoveryStrat? // TODO: should this actually be done earlier, before (or as part of) // leader election perhaps? - // TODO: if I'm the leader, ensure that a replica that is trying to recover waits until I'm - // active (or don't make me the - // leader until my local replay is done. UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); if (!core.isReloaded() && ulog != null) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index c088a5214ee..82bb4fec617 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -256,7 +256,7 @@ public class CoreContainer { preRegisterInZk(cd); } c = create(cd); - registerCore(cd.isTransient(), name, c, false); + registerCore(cd.isTransient(), name, c, false, false); } catch (Throwable t) { /* if (isZooKeeperAware()) { try { @@ -316,6 +316,20 @@ public class CoreContainer { ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor); } } + + if (isZooKeeperAware()) { + // register in zk in background threads + Collection cores = getCores(); + if (cores != null) { + for (SolrCore core : cores) { + try { + zkSys.registerInZk(core, true); + } catch (Throwable t) { + SolrException.log(log, "Error registering SolrCore", t); + } + } + } + } } private static void checkForDuplicateCoreNames(List cds) { @@ -434,6 +448,10 @@ public class CoreContainer { } protected SolrCore registerCore(boolean isTransientCore, String name, SolrCore core, boolean returnPrevNotClosed) { + return registerCore(isTransientCore, name, core, returnPrevNotClosed, true); + } + + protected SolrCore registerCore(boolean isTransientCore, String name, SolrCore core, boolean returnPrevNotClosed, boolean registerInZk) { if( core == null ) { throw new RuntimeException( "Can not register a null core." ); } @@ -476,7 +494,9 @@ public class CoreContainer { if( old == null || old == core) { log.info( "registering core: "+name ); - zkSys.registerInZk(core); + if (registerInZk) { + zkSys.registerInZk(core, false); + } return null; } else { @@ -484,7 +504,9 @@ public class CoreContainer { if (!returnPrevNotClosed) { old.close(); } - zkSys.registerInZk(core); + if (registerInZk) { + zkSys.registerInZk(core, false); + } return old; } } diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 6b281d8fa47..5cf08ee39a5 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -24,8 +24,10 @@ import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchemaFactory; +import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.SystemIdResolver; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -38,6 +40,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; public class ZkContainer { @@ -45,12 +49,9 @@ public class ZkContainer { protected ZkController zkController; private SolrZkServer zkServer; - private int zkClientTimeout; - private String hostPort; - private String hostContext; - private String host; - private int leaderVoteWait; - private Boolean genericCoreNodeNames; + + private ExecutorService coreZkRegister = Executors.newFixedThreadPool(Integer.MAX_VALUE, + new DefaultSolrThreadFactory("coreZkRegister") ); public ZkContainer() { @@ -82,13 +83,6 @@ public class ZkContainer { String zkRun = System.getProperty("zkRun"); - this.zkClientTimeout = zkClientTimeout; - this.hostPort = hostPort; - this.hostContext = hostContext; - this.host = host; - this.leaderVoteWait = leaderVoteWait; - this.genericCoreNodeNames = genericCoreNodeNames; - if (zkRun == null && zookeeperHost == null) return; // not in zk mode @@ -239,34 +233,36 @@ public class ZkContainer { } } - public void registerInZk(SolrCore core) { - if (zkController != null) { - try { - zkController.register(core.getName(), core.getCoreDescriptor()); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - SolrException.log(log, "", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", - e); - } catch (Exception e) { - // if register fails, this is really bad - close the zkController to - // minimize any damage we can cause - try { - zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN); - } catch (KeeperException e1) { - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + public void registerInZk(final SolrCore core, boolean background) { + Thread thread = new Thread() { + @Override + public void run() { + try { + zkController.register(core.getName(), core.getCoreDescriptor()); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + SolrException.log(log, "", e); + } catch (Exception e) { + try { + zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + log.error("", e1); + } catch (Exception e1) { + log.error("", e1); + } + SolrException.log(log, "", e); + } } - SolrException.log(log, "", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", - e); + + }; + + if (zkController != null) { + if (background) { + coreZkRegister.execute(thread); + } else { + thread.run(); } } } @@ -309,12 +305,20 @@ public class ZkContainer { } public void close() { - if (zkController != null) { - zkController.close(); + + try { + if (zkController != null) { + zkController.close(); + } + } finally { + try { + if (zkServer != null) { + zkServer.stop(); + } + } finally { + ExecutorUtil.shutdownNowAndAwaitTermination(coreZkRegister); + } } - if (zkServer != null) { - zkServer.stop(); - } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index 3b4cb48fa06..952ee593a3e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -864,6 +864,7 @@ public class CoreAdminHandler extends RequestHandlerBase { String waitForState = params.get("state"); Boolean checkLive = params.getBool("checkLive"); Boolean onlyIfLeader = params.getBool("onlyIfLeader"); + Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive"); log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader); @@ -906,6 +907,11 @@ public class CoreAdminHandler extends RequestHandlerBase { if (nodeProps != null) { state = nodeProps.getStr(ZkStateReader.STATE_PROP); live = clusterState.liveNodesContain(nodeName); + + String localState = cloudDescriptor.getLastPublished(); + if (onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE))) { + continue; + } if (nodeProps != null && state.equals(waitForState)) { if (checkLive == null) { break; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java index 9543ccd4c9e..506f3fd1a16 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java @@ -156,7 +156,7 @@ public class CoreAdminRequest extends SolrRequest protected String state; protected Boolean checkLive; protected Boolean onlyIfLeader; - + protected Boolean onlyIfLeaderActive; public WaitForState() { action = CoreAdminAction.PREPRECOVERY; @@ -202,6 +202,10 @@ public class CoreAdminRequest extends SolrRequest this.onlyIfLeader = onlyIfLeader; } + public void setOnlyIfLeaderActive(boolean onlyIfLeaderActive) { + this.onlyIfLeaderActive = onlyIfLeaderActive; + } + @Override public SolrParams getParams() { if( action == null ) { @@ -231,6 +235,10 @@ public class CoreAdminRequest extends SolrRequest if (onlyIfLeader != null) { params.set( "onlyIfLeader", onlyIfLeader); } + + if (onlyIfLeaderActive != null) { + params.set( "onlyIfLeaderActive", onlyIfLeaderActive); + } return params; }