diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index e12ddfddec8..f24bd6e0816 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -103,17 +103,12 @@ Bug Fixes * SOLR-3770: Overseer may lose updates to cluster state (siren) -* SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at - the same time if the recovery thread join call was interrupted. +* SOLR-3721: Fix bug that could theoretically allow multiple recoveries to run + briefly at the same time if the recovery thread join call was interrupted. (Per Steffensen, Mark Miller) - -* SOLR-3750: On session expiration, we should explicitly wait some time before - running the leader sync process so that we are sure every node participates. - (Per Steffensen, Mark Miller) - -* SOLR-3772: On cluster startup, we should wait until we see all registered - replicas before running the leader process - or if they all do not come up, - N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller) + +* SOLR-3782: A leader going down while updates are coming in can cause shard + inconsistency. (Mark Miller) Other Changes ---------------------- @@ -144,7 +139,14 @@ Other Changes * SOLR-3780: Maven build: Make solrj tests run separately from solr-core. (Steve Rowe) + +* SOLR-3772: Optionally, on cluster startup, we can wait until we see all registered + replicas before running the leader process - or if they all do not come up, + N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller) +* SOLR-3750: Optionaly, on session expiration, we can explicitly wait some time before + running the leader sync process so that we are sure every node participates. + (Per Steffensen, Mark Miller) ================== 4.0.0-BETA =================== diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 954cd4c51b1..82dbb4e16ce 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -55,6 +55,8 @@ public abstract class ElectionContext { this.zkClient = zkClient; } + public void close() {} + public void cancelElection() throws InterruptedException, KeeperException { zkClient.delete(leaderSeqPath, -1, true); } @@ -83,10 +85,6 @@ class ShardLeaderElectionContextBase extends ElectionContext { @Override void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { - // this pause is important - // but I don't know why yet :*( - it must come before this publish call - // and can happen at the start of leader election process even - Thread.sleep(100); zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL, true); @@ -112,6 +110,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { private SyncStrategy syncStrategy = new SyncStrategy(); private boolean afterExpiration; + + private volatile boolean isClosed = false; public ShardLeaderElectionContext(LeaderElector leaderElector, final String shardId, final String collection, @@ -123,9 +123,16 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { this.afterExpiration = afterExpiration; } + @Override + public void close() { + this.isClosed = true; + } + @Override void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { + log.info("Running the leader process. afterExpiration=" + afterExpiration); + String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP); // clear the leader in clusterstate @@ -134,21 +141,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { collection); Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m)); - waitForReplicasToComeUp(weAreReplacement); - - // wait for local leader state to clear... - // int tries = 0; - // while (zkController.getClusterState().getLeader(collection, shardId) != - // null) { - // System.out.println("leader still shown " + tries + " " + - // zkController.getClusterState().getLeader(collection, shardId)); - // Thread.sleep(1000); - // tries++; - // if (tries == 30) { - // break; - // } - // } - // Thread.sleep(1000); + String leaderVoteWait = cc.getZkController().getLeaderVoteWait(); + if (leaderVoteWait != null) { + waitForReplicasToComeUp(weAreReplacement, leaderVoteWait); + } SolrCore core = null; try { @@ -238,14 +234,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } - private void waitForReplicasToComeUp(boolean weAreReplacement) + private void waitForReplicasToComeUp(boolean weAreReplacement, String leaderVoteWait) throws InterruptedException { - int retries = 300; // ~ 5 min + int timeout = Integer.parseInt(leaderVoteWait); + long timeoutAt = System.currentTimeMillis() + timeout; + boolean tryAgain = true; Slice slices = zkController.getClusterState().getSlice(collection, shardId); - log.info("Running the leader process. afterExperiation=" + afterExpiration); - while (tryAgain || slices == null) { - + while (true && !isClosed) { // wait for everyone to be up if (slices != null) { Map shards = slices.getShards(); @@ -265,24 +261,23 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { if ((afterExpiration || !weAreReplacement) && found >= slices.getShards().size()) { log.info("Enough replicas found to continue."); - tryAgain = false; + break; } else if (!afterExpiration && found >= slices.getShards().size() - 1) { // a previous leader went down - wait for one less than the total // known shards log.info("Enough replicas found to continue."); - tryAgain = false; + break; } else { - log.info("Waiting until we see more replicas up"); + log.info("Waiting until we see more replicas up: total=" + slices.getShards().size() + " found=" + found + " timeoutin=" + (timeoutAt - System.currentTimeMillis())); } - - retries--; - if (retries == 0) { + + if (System.currentTimeMillis() > timeoutAt) { log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later"); break; } } if (tryAgain) { - Thread.sleep(1000); + Thread.sleep(500); slices = zkController.getClusterState().getSlice(collection, shardId); } } @@ -306,6 +301,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) { log.info("Checking if I should try and be the leader."); + + if (isClosed) { + log.info("Bailing on leader process because we have been closed"); + return false; + } + ClusterState clusterState = zkController.getZkStateReader().getClusterState(); Map slices = clusterState.getSlices(this.collection); Slice slice = slices.get(shardId); diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 7b09abb1afe..69f8577ae09 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -65,8 +65,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread { private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class); + public static interface RecoveryListener { + public void recovered(); + public void failed(); + } + private volatile boolean close = false; + private RecoveryListener recoveryListener; private ZkController zkController; private String baseUrl; private String coreZkNodeName; @@ -76,9 +82,10 @@ public class RecoveryStrategy extends Thread implements ClosableThread { private boolean recoveringAfterStartup; private CoreContainer cc; - public RecoveryStrategy(CoreContainer cc, String name) { + public RecoveryStrategy(CoreContainer cc, String name, RecoveryListener recoveryListener) { this.cc = cc; this.coreName = name; + this.recoveryListener = recoveryListener; setName("RecoveryThread"); zkController = cc.getZkController(); zkStateReader = zkController.getZkStateReader(); @@ -93,7 +100,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // make sure any threads stop retrying public void close() { close = true; - log.warn("Stopping recovery for core " + coreName + " zkNodeName=" + coreZkNodeName); + log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName ); } @@ -105,6 +112,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { zkController.publish(cd, ZkStateReader.RECOVERY_FAILED); } finally { close(); + recoveryListener.failed(); } } @@ -210,15 +218,15 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try { doRecovery(core); - } catch (KeeperException e) { - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); - } catch (InterruptedException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); SolrException.log(log, "", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (Throwable t) { + log.error("", t); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, + "", t); } } finally { if (core != null) core.close(); @@ -258,38 +266,52 @@ public class RecoveryStrategy extends Thread implements ClosableThread { List startingVersions = ulog.getStartingVersions(); - if (startingVersions != null && recoveringAfterStartup) { - int oldIdx = 0; // index of the start of the old list in the current list - long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0; - - for (; oldIdx 0 ? startingVersions + .get(0) : 0; + + for (; oldIdx < recentVersions.size(); oldIdx++) { + if (recentVersions.get(oldIdx) == firstStartingVersion) break; + } + + if (oldIdx > 0) { + log.info("####### Found new versions added after startup: num=" + + oldIdx); + log.info("###### currentVersions=" + recentVersions); + } + + log.info("###### startupVersions=" + startingVersions); + } catch (Throwable t) { + SolrException.log(log, "Error getting recent versions. core=" + coreName, t); + recentVersions = new ArrayList(0); } - - if (oldIdx > 0) { - log.info("####### Found new versions added after startup: num=" + oldIdx); - log.info("###### currentVersions=" + recentVersions); - } - - log.info("###### startupVersions=" + startingVersions); } if (recoveringAfterStartup) { // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were // when we went down. We may have received updates since then. recentVersions = startingVersions; - - if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { - // last operation at the time of startup had the GAP flag set... - // this means we were previously doing a full index replication - // that probably didn't complete and buffering updates in the meantime. - log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + coreName); - firstTime = false; // skip peersync + try { + if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { + // last operation at the time of startup had the GAP flag set... + // this means we were previously doing a full index replication + // that probably didn't complete and buffering updates in the + // meantime. + log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + + coreName); + firstTime = false; // skip peersync + } + } catch (Throwable t) { + SolrException.log(log, "Error trying to get ulog starting operation. core=" + + coreName, t); + firstTime = false; // skip peersync } } - while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though + while (!successfulRecovery && !isInterrupted()) { // don't use interruption or it will close channels though try { CloudDescriptor cloudDesc = core.getCoreDescriptor() .getCloudDescriptor(); @@ -393,6 +415,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE); close = true; successfulRecovery = true; + recoveryListener.recovered(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Recovery was interrupted", e); @@ -421,10 +444,17 @@ public class RecoveryStrategy extends Thread implements ClosableThread { try { log.error("Recovery failed - trying again... core=" + coreName); + + if (isClosed()) { + retries = INTERRUPTED; + } + retries++; if (retries >= MAX_RETRIES) { if (retries == INTERRUPTED) { - + SolrException.log(log, "Recovery failed - interrupted. core=" + coreName); + recoveryFailed(core, zkController, baseUrl, coreZkNodeName, + core.getCoreDescriptor()); } else { SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName); recoveryFailed(core, zkController, baseUrl, coreZkNodeName, @@ -433,7 +463,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { break; } - } catch (Exception e) { + } catch (Throwable e) { SolrException.log(log, "core=" + coreName, e); } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 04df6259a4a..d02227a3c3b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -123,6 +123,8 @@ public final class ZkController { protected volatile Overseer overseer; + private String leaderVoteWait; + /** * @param cc * @param zkServerAddress @@ -139,6 +141,27 @@ public final class ZkController { public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort, String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, TimeoutException, IOException { + this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect); + } + + + /** + * @param cc + * @param zkServerAddress + * @param zkClientTimeout + * @param zkClientConnectTimeout + * @param localHost + * @param locaHostPort + * @param localHostContext + * @param leaderVoteWait + * @param registerOnReconnect + * @throws InterruptedException + * @throws TimeoutException + * @throws IOException + */ + public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort, + String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, + TimeoutException, IOException { if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null."); this.cc = cc; if (localHostContext.contains("/")) { @@ -153,6 +176,7 @@ public final class ZkController { this.hostName = getHostNameFromAddress(this.localHost); this.nodeName = this.hostName + ':' + this.localHostPort + '_' + this.localHostContext; this.baseURL = this.localHost + ":" + this.localHostPort + "/" + this.localHostContext; + this.leaderVoteWait = leaderVoteWait; zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, // on reconnect, reload cloud info @@ -257,6 +281,10 @@ public final class ZkController { init(registerOnReconnect); } + public String getLeaderVoteWait() { + return leaderVoteWait; + } + private void registerAllCoresAsDown( final CurrentCoreDescriptorProvider registerOnReconnect) { List descriptors = registerOnReconnect @@ -281,6 +309,22 @@ public final class ZkController { * Closes the underlying ZooKeeper client. */ public void close() { + try { + String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; + // we don't retry if there is a problem - count on ephem timeout + zkClient.delete(nodePath, -1, false); + } catch (KeeperException.NoNodeException e) { + // fine + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (KeeperException e) { + SolrException.log(log, "Error trying to remove our ephem live node", e); + } + + for (ElectionContext context : electionContexts.values()) { + context.close(); + } + try { overseer.close(); } catch(Throwable t) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 4b3f469f57e..a4b6ebeac7b 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -34,6 +34,9 @@ import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.xml.parsers.ParserConfigurationException; @@ -56,10 +59,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; -import org.apache.solr.common.params.CoreAdminParams; -import org.apache.solr.util.DOMUtil; -import org.apache.solr.util.FileUtils; -import org.apache.solr.util.SystemIdResolver; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef; import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef; import org.apache.solr.handler.admin.CollectionsHandler; @@ -71,6 +71,10 @@ import org.apache.solr.logging.LogWatcher; import org.apache.solr.logging.jul.JulWatcher; import org.apache.solr.schema.IndexSchema; import org.apache.solr.update.SolrCoreState; +import org.apache.solr.util.DOMUtil; +import org.apache.solr.util.DefaultSolrThreadFactory; +import org.apache.solr.util.FileUtils; +import org.apache.solr.util.SystemIdResolver; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,8 +143,10 @@ public class CoreContainer protected LogWatcher logging = null; private String zkHost; private Map coreToOrigName = new ConcurrentHashMap(); + private String leaderVoteWait; - + private ThreadPoolExecutor cmdDistribExecutor; + { log.info("New CoreContainer " + System.identityHashCode(this)); } @@ -184,6 +190,10 @@ public class CoreContainer } protected void initZooKeeper(String zkHost, int zkClientTimeout) { + cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, + TimeUnit.SECONDS, new SynchronousQueue(), + new DefaultSolrThreadFactory("cmdDistribExecutor")); + // if zkHost sys property is not set, we are not using ZooKeeper String zookeeperHost; if(zkHost == null) { @@ -227,7 +237,7 @@ public class CoreContainer } else { log.info("Zookeeper client=" + zookeeperHost); } - zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() { + zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, leaderVoteWait, new CurrentCoreDescriptorProvider() { @Override public List getCurrentDescriptors() { @@ -286,6 +296,11 @@ public class CoreContainer } + // may return null if not in zk mode + public ThreadPoolExecutor getCmdDistribExecutor() { + return cmdDistribExecutor; + } + public Properties getContainerProperties() { return containerProperties; } @@ -456,6 +471,8 @@ public class CoreContainer hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT); host = cfg.get("solr/cores/@host", null); + + leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", null); if(shareSchema){ indexSchemaCache = new ConcurrentHashMap(); @@ -601,6 +618,13 @@ public class CoreContainer if (shardHandlerFactory != null) { shardHandlerFactory.close(); } + if (cmdDistribExecutor != null) { + try { + ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor); + } catch (Throwable e) { + SolrException.log(log, e); + } + } // we want to close zk stuff last if(zkController != null) { zkController.close(); diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 79409fdf0ed..bc7903a09c7 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.LBHttpSolrServer; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.PluginInfo; import org.apache.solr.util.DefaultSolrThreadFactory; @@ -163,7 +164,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug SolrException.log(log, e); } try { - commExecutor.shutdownNow(); + ExecutorUtil.shutdownAndAwaitTermination(commExecutor); } catch (Throwable e) { SolrException.log(log, e); } diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index c00bd0227da..c1ce8fc6e7d 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -17,6 +17,13 @@ package org.apache.solr.handler.component; * limitations under the License. */ +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.StorableField; @@ -30,7 +37,7 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.*; +import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; @@ -54,10 +61,6 @@ import org.apache.solr.util.RefCounted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URL; -import java.util.*; - public class RealTimeGetComponent extends SearchComponent { diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index 174e0d34114..8c597fe047c 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -29,7 +29,7 @@ import org.apache.solr.util.RefCounted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class DefaultSolrCoreState extends SolrCoreState { +public final class DefaultSolrCoreState extends SolrCoreState implements RecoveryStrategy.RecoveryListener { public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class); private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery"); @@ -43,7 +43,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { private SolrIndexWriter indexWriter = null; private DirectoryFactory directoryFactory; - private boolean recoveryRunning; + private volatile boolean recoveryRunning; private RecoveryStrategy recoveryStrat; private boolean closed = false; @@ -163,6 +163,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { log.error("Error during shutdown of directory factory.", t); } try { + log.info("Closing SolrCoreState - canceling any ongoing recovery"); cancelRecovery(); } catch (Throwable t) { log.error("Error cancelling recovery", t); @@ -210,6 +211,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { } synchronized (recoveryLock) { + log.info("Running recovery - first canceling any ongoing recovery"); cancelRecovery(); while (recoveryRunning) { @@ -229,7 +231,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { // if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery) boolean recoveringAfterStartup = recoveryStrat == null; - recoveryStrat = new RecoveryStrategy(cc, name); + recoveryStrat = new RecoveryStrategy(cc, name, this); recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup); recoveryStrat.start(); recoveryRunning = true; @@ -240,7 +242,7 @@ public final class DefaultSolrCoreState extends SolrCoreState { @Override public void cancelRecovery() { synchronized (recoveryLock) { - if (recoveryStrat != null) { + if (recoveryStrat != null && recoveryRunning) { recoveryStrat.close(); while (true) { try { @@ -257,5 +259,15 @@ public final class DefaultSolrCoreState extends SolrCoreState { } } } + + @Override + public void recovered() { + recoveryRunning = false; + } + + @Override + public void failed() { + recoveryRunning = false; + } } diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 4cd89ee9011..9f9adab283f 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -53,11 +53,6 @@ import org.slf4j.LoggerFactory; public class SolrCmdDistributor { private static final int MAX_RETRIES_ON_FORWARD = 6; public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class); - - // TODO: shut this thing down - static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, - TimeUnit.SECONDS, new SynchronousQueue(), - new DefaultSolrThreadFactory("cmdDistribExecutor"));; static final HttpClient client; static AdjustableSemaphore semaphore = new AdjustableSemaphore(8); @@ -90,7 +85,7 @@ public class SolrCmdDistributor { ModifiableSolrParams params; } - public SolrCmdDistributor(int numHosts) { + public SolrCmdDistributor(int numHosts, ThreadPoolExecutor executor) { int maxPermits = Math.max(8, (numHosts - 1) * 8); // limits how many tasks can actually execute at once @@ -98,7 +93,7 @@ public class SolrCmdDistributor { semaphore.setMaxPermits(maxPermits); } - completionService = new ExecutorCompletionService(commExecutor); + completionService = new ExecutorCompletionService(executor); pending = new HashSet>(); } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 928c8f9561b..81c2b138751 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -164,15 +164,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } //this.rsp = reqInfo != null ? reqInfo.getRsp() : null; - - cloudDesc = coreDesc.getCloudDescriptor(); if (cloudDesc != null) { collection = cloudDesc.getCollectionName(); } - cmdDistrib = new SolrCmdDistributor(numNodes); + cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getCmdDistribExecutor()); } private List setupRequest(int hash) { @@ -851,6 +849,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // forward to all replicas if (leaderLogic && replicas != null) { + if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { + log.error("Abort sending request to replicas, we are no longer leader"); + throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader"); + } ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 83e5415fc46..2987fa56f8b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -164,11 +164,14 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase // have request fails checkShardConsistency(false, true); + long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults() + .getNumFound(); + // ensure we have added more than 0 docs long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")) .getResults().getNumFound(); - assertTrue(cloudClientDocs > 0); + assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0); if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults() diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index d20c68c7ffc..206f92bb43c 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -21,6 +21,9 @@ import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.client.solrj.SolrQuery; @@ -37,9 +40,12 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.update.SolrCmdDistributor.Node; import org.apache.solr.update.SolrCmdDistributor.Response; import org.apache.solr.update.SolrCmdDistributor.StdNode; +import org.apache.solr.util.DefaultSolrThreadFactory; public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { - + private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, + TimeUnit.SECONDS, new SynchronousQueue(), + new DefaultSolrThreadFactory("cmdDistribExecutor")); public SolrCmdDistributorTest() { fixShardCount = true; @@ -85,7 +91,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { public void doTest() throws Exception { del("*:*"); - SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8); + SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8, executor); ModifiableSolrParams params = new ModifiableSolrParams(); List nodes = new ArrayList(); @@ -119,7 +125,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps))); // add another 2 docs to control and 3 to client - cmdDistrib = new SolrCmdDistributor(8); + cmdDistrib = new SolrCmdDistributor(8, executor); cmd.solrDoc = sdoc("id", 2); cmdDistrib.distribAdd(cmd, nodes, params); @@ -152,7 +158,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null); dcmd.id = "2"; - cmdDistrib = new SolrCmdDistributor(8); + cmdDistrib = new SolrCmdDistributor(8, executor); cmdDistrib.distribDelete(dcmd, nodes, params); cmdDistrib.distribCommit(ccmd, nodes, params); @@ -180,7 +186,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { int id = 5; - cmdDistrib = new SolrCmdDistributor(8); + cmdDistrib = new SolrCmdDistributor(8, executor); nodes.clear(); int cnt = atLeast(200); diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java new file mode 100644 index 00000000000..6ff31875f84 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -0,0 +1,48 @@ +package org.apache.solr.common.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.common.SolrException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ExecutorUtil { + public static Logger log = LoggerFactory.getLogger(ExecutorUtil.class); + + public static void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) + SolrException.log(log, "Executor still has running tasks."); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } +}