SOLR-3772,SOLR-3750: make optional, wait time configurable, default to off

SOLR-3782: A leader going down while updates are coming in can cause shard inconsistency. 

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1380299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-09-03 17:14:25 +00:00
parent 7af7d1de48
commit 278200cfaa
13 changed files with 274 additions and 103 deletions

View File

@ -103,17 +103,12 @@ Bug Fixes
* SOLR-3770: Overseer may lose updates to cluster state (siren)
* SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at
the same time if the recovery thread join call was interrupted.
* SOLR-3721: Fix bug that could theoretically allow multiple recoveries to run
briefly at the same time if the recovery thread join call was interrupted.
(Per Steffensen, Mark Miller)
* SOLR-3750: On session expiration, we should explicitly wait some time before
running the leader sync process so that we are sure every node participates.
(Per Steffensen, Mark Miller)
* SOLR-3772: On cluster startup, we should wait until we see all registered
replicas before running the leader process - or if they all do not come up,
N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
* SOLR-3782: A leader going down while updates are coming in can cause shard
inconsistency. (Mark Miller)
Other Changes
----------------------
@ -144,7 +139,14 @@ Other Changes
* SOLR-3780: Maven build: Make solrj tests run separately from solr-core.
(Steve Rowe)
* SOLR-3772: Optionally, on cluster startup, we can wait until we see all registered
replicas before running the leader process - or if they all do not come up,
N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
* SOLR-3750: Optionaly, on session expiration, we can explicitly wait some time before
running the leader sync process so that we are sure every node participates.
(Per Steffensen, Mark Miller)
================== 4.0.0-BETA ===================

View File

@ -55,6 +55,8 @@ public abstract class ElectionContext {
this.zkClient = zkClient;
}
public void close() {}
public void cancelElection() throws InterruptedException, KeeperException {
zkClient.delete(leaderSeqPath, -1, true);
}
@ -83,10 +85,6 @@ class ShardLeaderElectionContextBase extends ElectionContext {
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
// this pause is important
// but I don't know why yet :*( - it must come before this publish call
// and can happen at the start of leader election process even
Thread.sleep(100);
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
@ -112,6 +110,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private SyncStrategy syncStrategy = new SyncStrategy();
private boolean afterExpiration;
private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
@ -123,9 +123,16 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
this.afterExpiration = afterExpiration;
}
@Override
public void close() {
this.isClosed = true;
}
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
log.info("Running the leader process. afterExpiration=" + afterExpiration);
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
// clear the leader in clusterstate
@ -134,21 +141,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
collection);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
waitForReplicasToComeUp(weAreReplacement);
// wait for local leader state to clear...
// int tries = 0;
// while (zkController.getClusterState().getLeader(collection, shardId) !=
// null) {
// System.out.println("leader still shown " + tries + " " +
// zkController.getClusterState().getLeader(collection, shardId));
// Thread.sleep(1000);
// tries++;
// if (tries == 30) {
// break;
// }
// }
// Thread.sleep(1000);
String leaderVoteWait = cc.getZkController().getLeaderVoteWait();
if (leaderVoteWait != null) {
waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
}
SolrCore core = null;
try {
@ -238,14 +234,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
private void waitForReplicasToComeUp(boolean weAreReplacement)
private void waitForReplicasToComeUp(boolean weAreReplacement, String leaderVoteWait)
throws InterruptedException {
int retries = 300; // ~ 5 min
int timeout = Integer.parseInt(leaderVoteWait);
long timeoutAt = System.currentTimeMillis() + timeout;
boolean tryAgain = true;
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
log.info("Running the leader process. afterExperiation=" + afterExpiration);
while (tryAgain || slices == null) {
while (true && !isClosed) {
// wait for everyone to be up
if (slices != null) {
Map<String,ZkNodeProps> shards = slices.getShards();
@ -265,24 +261,23 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if ((afterExpiration || !weAreReplacement)
&& found >= slices.getShards().size()) {
log.info("Enough replicas found to continue.");
tryAgain = false;
break;
} else if (!afterExpiration && found >= slices.getShards().size() - 1) {
// a previous leader went down - wait for one less than the total
// known shards
log.info("Enough replicas found to continue.");
tryAgain = false;
break;
} else {
log.info("Waiting until we see more replicas up");
log.info("Waiting until we see more replicas up: total=" + slices.getShards().size() + " found=" + found + " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
}
retries--;
if (retries == 0) {
if (System.currentTimeMillis() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
break;
}
}
if (tryAgain) {
Thread.sleep(1000);
Thread.sleep(500);
slices = zkController.getClusterState().getSlice(collection, shardId);
}
}
@ -306,6 +301,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
log.info("Checking if I should try and be the leader.");
if (isClosed) {
log.info("Bailing on leader process because we have been closed");
return false;
}
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
Map<String,Slice> slices = clusterState.getSlices(this.collection);
Slice slice = slices.get(shardId);

View File

@ -65,8 +65,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
public static interface RecoveryListener {
public void recovered();
public void failed();
}
private volatile boolean close = false;
private RecoveryListener recoveryListener;
private ZkController zkController;
private String baseUrl;
private String coreZkNodeName;
@ -76,9 +82,10 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
private boolean recoveringAfterStartup;
private CoreContainer cc;
public RecoveryStrategy(CoreContainer cc, String name) {
public RecoveryStrategy(CoreContainer cc, String name, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = name;
this.recoveryListener = recoveryListener;
setName("RecoveryThread");
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
@ -93,7 +100,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// make sure any threads stop retrying
public void close() {
close = true;
log.warn("Stopping recovery for core " + coreName + " zkNodeName=" + coreZkNodeName);
log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName );
}
@ -105,6 +112,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
} finally {
close();
recoveryListener.failed();
}
}
@ -210,15 +218,15 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try {
doRecovery(core);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
} catch (Throwable t) {
log.error("", t);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", t);
}
} finally {
if (core != null) core.close();
@ -258,38 +266,52 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
List<Long> startingVersions = ulog.getStartingVersions();
if (startingVersions != null && recoveringAfterStartup) {
int oldIdx = 0; // index of the start of the old list in the current list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
for (; oldIdx<recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
try {
int oldIdx = 0; // index of the start of the old list in the current
// list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions
.get(0) : 0;
for (; oldIdx < recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
}
if (oldIdx > 0) {
log.info("####### Found new versions added after startup: num="
+ oldIdx);
log.info("###### currentVersions=" + recentVersions);
}
log.info("###### startupVersions=" + startingVersions);
} catch (Throwable t) {
SolrException.log(log, "Error getting recent versions. core=" + coreName, t);
recentVersions = new ArrayList<Long>(0);
}
if (oldIdx > 0) {
log.info("####### Found new versions added after startup: num=" + oldIdx);
log.info("###### currentVersions=" + recentVersions);
}
log.info("###### startupVersions=" + startingVersions);
}
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down. We may have received updates since then.
recentVersions = startingVersions;
if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
// last operation at the time of startup had the GAP flag set...
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the meantime.
log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + coreName);
firstTime = false; // skip peersync
try {
if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
// last operation at the time of startup had the GAP flag set...
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the
// meantime.
log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core="
+ coreName);
firstTime = false; // skip peersync
}
} catch (Throwable t) {
SolrException.log(log, "Error trying to get ulog starting operation. core="
+ coreName, t);
firstTime = false; // skip peersync
}
}
while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though
while (!successfulRecovery && !isInterrupted()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
@ -393,6 +415,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
close = true;
successfulRecovery = true;
recoveryListener.recovered();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
@ -421,10 +444,17 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try {
log.error("Recovery failed - trying again... core=" + coreName);
if (isClosed()) {
retries = INTERRUPTED;
}
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
SolrException.log(log, "Recovery failed - interrupted. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} else {
SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
@ -433,7 +463,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
break;
}
} catch (Exception e) {
} catch (Throwable e) {
SolrException.log(log, "core=" + coreName, e);
}

View File

@ -123,6 +123,8 @@ public final class ZkController {
protected volatile Overseer overseer;
private String leaderVoteWait;
/**
* @param cc
* @param zkServerAddress
@ -139,6 +141,27 @@ public final class ZkController {
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect);
}
/**
* @param cc
* @param zkServerAddress
* @param zkClientTimeout
* @param zkClientConnectTimeout
* @param localHost
* @param locaHostPort
* @param localHostContext
* @param leaderVoteWait
* @param registerOnReconnect
* @throws InterruptedException
* @throws TimeoutException
* @throws IOException
*/
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
if (localHostContext.contains("/")) {
@ -153,6 +176,7 @@ public final class ZkController {
this.hostName = getHostNameFromAddress(this.localHost);
this.nodeName = this.hostName + ':' + this.localHostPort + '_' + this.localHostContext;
this.baseURL = this.localHost + ":" + this.localHostPort + "/" + this.localHostContext;
this.leaderVoteWait = leaderVoteWait;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info
@ -257,6 +281,10 @@ public final class ZkController {
init(registerOnReconnect);
}
public String getLeaderVoteWait() {
return leaderVoteWait;
}
private void registerAllCoresAsDown(
final CurrentCoreDescriptorProvider registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect
@ -281,6 +309,22 @@ public final class ZkController {
* Closes the underlying ZooKeeper client.
*/
public void close() {
try {
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
// we don't retry if there is a problem - count on ephem timeout
zkClient.delete(nodePath, -1, false);
} catch (KeeperException.NoNodeException e) {
// fine
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException e) {
SolrException.log(log, "Error trying to remove our ephem live node", e);
}
for (ElectionContext context : electionContexts.values()) {
context.close();
}
try {
overseer.close();
} catch(Throwable t) {

View File

@ -34,6 +34,9 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.parsers.ParserConfigurationException;
@ -56,10 +59,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
import org.apache.solr.handler.admin.CollectionsHandler;
@ -71,6 +71,10 @@ import org.apache.solr.logging.LogWatcher;
import org.apache.solr.logging.jul.JulWatcher;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -139,8 +143,10 @@ public class CoreContainer
protected LogWatcher logging = null;
private String zkHost;
private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
private String leaderVoteWait;
private ThreadPoolExecutor cmdDistribExecutor;
{
log.info("New CoreContainer " + System.identityHashCode(this));
}
@ -184,6 +190,10 @@ public class CoreContainer
}
protected void initZooKeeper(String zkHost, int zkClientTimeout) {
cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
// if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
if(zkHost == null) {
@ -227,7 +237,7 @@ public class CoreContainer
} else {
log.info("Zookeeper client=" + zookeeperHost);
}
zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, leaderVoteWait, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
@ -286,6 +296,11 @@ public class CoreContainer
}
// may return null if not in zk mode
public ThreadPoolExecutor getCmdDistribExecutor() {
return cmdDistribExecutor;
}
public Properties getContainerProperties() {
return containerProperties;
}
@ -456,6 +471,8 @@ public class CoreContainer
hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
host = cfg.get("solr/cores/@host", null);
leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", null);
if(shareSchema){
indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
@ -601,6 +618,13 @@ public class CoreContainer
if (shardHandlerFactory != null) {
shardHandlerFactory.close();
}
if (cmdDistribExecutor != null) {
try {
ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
}
// we want to close zk stuff last
if(zkController != null) {
zkController.close();

View File

@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
@ -163,7 +164,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements Plug
SolrException.log(log, e);
}
try {
commExecutor.shutdownNow();
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}

View File

@ -17,6 +17,13 @@ package org.apache.solr.handler.component;
* limitations under the License.
*/
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.StorableField;
@ -30,7 +37,7 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
@ -54,10 +61,6 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.util.*;
public class RealTimeGetComponent extends SearchComponent
{

View File

@ -29,7 +29,7 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class DefaultSolrCoreState extends SolrCoreState {
public final class DefaultSolrCoreState extends SolrCoreState implements RecoveryStrategy.RecoveryListener {
public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
@ -43,7 +43,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
private boolean recoveryRunning;
private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
private boolean closed = false;
@ -163,6 +163,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
log.error("Error during shutdown of directory factory.", t);
}
try {
log.info("Closing SolrCoreState - canceling any ongoing recovery");
cancelRecovery();
} catch (Throwable t) {
log.error("Error cancelling recovery", t);
@ -210,6 +211,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
}
synchronized (recoveryLock) {
log.info("Running recovery - first canceling any ongoing recovery");
cancelRecovery();
while (recoveryRunning) {
@ -229,7 +231,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
boolean recoveringAfterStartup = recoveryStrat == null;
recoveryStrat = new RecoveryStrategy(cc, name);
recoveryStrat = new RecoveryStrategy(cc, name, this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
recoveryStrat.start();
recoveryRunning = true;
@ -240,7 +242,7 @@ public final class DefaultSolrCoreState extends SolrCoreState {
@Override
public void cancelRecovery() {
synchronized (recoveryLock) {
if (recoveryStrat != null) {
if (recoveryStrat != null && recoveryRunning) {
recoveryStrat.close();
while (true) {
try {
@ -257,5 +259,15 @@ public final class DefaultSolrCoreState extends SolrCoreState {
}
}
}
@Override
public void recovered() {
recoveryRunning = false;
}
@Override
public void failed() {
recoveryRunning = false;
}
}

View File

@ -53,11 +53,6 @@ import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 6;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
// TODO: shut this thing down
static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));;
static final HttpClient client;
static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
@ -90,7 +85,7 @@ public class SolrCmdDistributor {
ModifiableSolrParams params;
}
public SolrCmdDistributor(int numHosts) {
public SolrCmdDistributor(int numHosts, ThreadPoolExecutor executor) {
int maxPermits = Math.max(8, (numHosts - 1) * 8);
// limits how many tasks can actually execute at once
@ -98,7 +93,7 @@ public class SolrCmdDistributor {
semaphore.setMaxPermits(maxPermits);
}
completionService = new ExecutorCompletionService<Request>(commExecutor);
completionService = new ExecutorCompletionService<Request>(executor);
pending = new HashSet<Future<Request>>();
}

View File

@ -164,15 +164,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
cloudDesc = coreDesc.getCloudDescriptor();
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
}
cmdDistrib = new SolrCmdDistributor(numNodes);
cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getCmdDistribExecutor());
}
private List<Node> setupRequest(int hash) {
@ -851,6 +849,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// forward to all replicas
if (leaderLogic && replicas != null) {
if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
log.error("Abort sending request to replicas, we are no longer leader");
throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader");
}
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());

View File

@ -164,11 +164,14 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// have request fails
checkShardConsistency(false, true);
long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// ensure we have added more than 0 docs
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
.getResults().getNumFound();
assertTrue(cloudClientDocs > 0);
assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
if (VERBOSE) System.out.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults()

View File

@ -21,6 +21,9 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
@ -37,9 +40,12 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.Response;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.util.DefaultSolrThreadFactory;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
public SolrCmdDistributorTest() {
fixShardCount = true;
@ -85,7 +91,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
public void doTest() throws Exception {
del("*:*");
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8, executor);
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<Node>();
@ -119,7 +125,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
cmdDistrib = new SolrCmdDistributor(8);
cmdDistrib = new SolrCmdDistributor(8, executor);
cmd.solrDoc = sdoc("id", 2);
cmdDistrib.distribAdd(cmd, nodes, params);
@ -152,7 +158,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
cmdDistrib = new SolrCmdDistributor(8);
cmdDistrib = new SolrCmdDistributor(8, executor);
cmdDistrib.distribDelete(dcmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);
@ -180,7 +186,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
int id = 5;
cmdDistrib = new SolrCmdDistributor(8);
cmdDistrib = new SolrCmdDistributor(8, executor);
nodes.clear();
int cnt = atLeast(200);

View File

@ -0,0 +1,48 @@
package org.apache.solr.common.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExecutorUtil {
public static Logger log = LoggerFactory.getLogger(ExecutorUtil.class);
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
SolrException.log(log, "Executor still has running tasks.");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}