mirror of https://github.com/apache/lucene.git
SOLR-3871: SyncStrategy should use an executor for the threads it creates to request recoveries.
SOLR-3870: SyncStrategy should have a close so it can abort earlier on shutdown. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1389173 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c656ba11bf
commit
72f7c4f9dc
|
@ -171,6 +171,7 @@ Optimizations
|
|||
* SOLR-3709: Cache the url list created from the ClusterState in CloudSolrServer
|
||||
on each request. (Mark Miller)
|
||||
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
@ -392,6 +393,12 @@ Other Changes
|
|||
* SOLR-3815: SolrCloud - Add properties such as "range" to shards, which changes
|
||||
the clusterstate.json and puts the shard replicas under "replicas". (yonik)
|
||||
|
||||
* SOLR-3871: SyncStrategy should use an executor for the threads it creates to
|
||||
request recoveries. (Mark Miller)
|
||||
|
||||
* SOLR-3870: SyncStrategy should have a close so it can abort earlier on
|
||||
shutdown. (Mark Miller)
|
||||
|
||||
|
||||
================== 4.0.0-BETA ===================
|
||||
|
||||
|
|
|
@ -122,6 +122,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
@Override
|
||||
public void close() {
|
||||
this.isClosed = true;
|
||||
syncStrategy.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,9 @@ package org.apache.solr.cloud;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -32,6 +35,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
|||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
||||
|
@ -39,6 +43,7 @@ import org.apache.solr.handler.component.ShardHandler;
|
|||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.handler.component.ShardResponse;
|
||||
import org.apache.solr.update.PeerSync;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -49,6 +54,13 @@ public class SyncStrategy {
|
|||
|
||||
private final ShardHandler shardHandler;
|
||||
|
||||
private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(
|
||||
0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
|
||||
"recoveryCmdExecutor"));
|
||||
|
||||
private volatile boolean isClosed;
|
||||
|
||||
private final static HttpClient client;
|
||||
static {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
@ -95,6 +107,10 @@ public class SyncStrategy {
|
|||
String collection = cloudDesc.getCollectionName();
|
||||
String shardId = cloudDesc.getShardId();
|
||||
|
||||
if (isClosed) {
|
||||
log.info("We have been closed, won't sync with replicas");
|
||||
return false;
|
||||
}
|
||||
// if no one that is up is active, we are willing to wait...
|
||||
// we don't want a recovering node to become leader and then
|
||||
// a better candidate pops up a second later.
|
||||
|
@ -118,7 +134,11 @@ public class SyncStrategy {
|
|||
SolrException.log(log, "Sync Failed", e);
|
||||
}
|
||||
try {
|
||||
|
||||
if (isClosed) {
|
||||
log.info("We have been closed, won't attempt to sync replicas back to leader");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (success) {
|
||||
log.info("Sync Success - now sync replicas to me");
|
||||
|
||||
|
@ -199,9 +219,11 @@ public class SyncStrategy {
|
|||
if (!success) {
|
||||
try {
|
||||
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover.");
|
||||
|
||||
requestRecovery(leaderProps, ((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
|
||||
|
||||
if (isClosed) {
|
||||
log.info("We have been closed, don't request that a replica recover");
|
||||
} else {
|
||||
requestRecovery(leaderProps, ((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
|
||||
}
|
||||
|
@ -247,6 +269,11 @@ public class SyncStrategy {
|
|||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
this.isClosed = true;
|
||||
ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
|
||||
}
|
||||
|
||||
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
|
||||
// TODO: do this in background threads
|
||||
Thread thread = new Thread() {
|
||||
|
@ -269,7 +296,7 @@ public class SyncStrategy {
|
|||
}
|
||||
}
|
||||
};
|
||||
thread.run();
|
||||
recoveryCmdExecutor.execute(thread);
|
||||
}
|
||||
|
||||
public static ModifiableSolrParams params(String... params) {
|
||||
|
|
|
@ -818,11 +818,12 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
|
||||
}
|
||||
SolrCore core = null;
|
||||
SyncStrategy syncStrategy = null;
|
||||
try {
|
||||
core = coreContainer.getCore(cname);
|
||||
if (core != null) {
|
||||
SyncStrategy syncStrategy = new SyncStrategy();
|
||||
|
||||
syncStrategy = new SyncStrategy();
|
||||
|
||||
Map<String,Object> props = new HashMap<String,Object>();
|
||||
props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, cname);
|
||||
|
@ -855,6 +856,9 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
if (core != null) {
|
||||
core.close();
|
||||
}
|
||||
if (syncStrategy != null) {
|
||||
syncStrategy.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue