SOLR-8416: The collections create API should return after all replicas are active.

This commit is contained in:
markrmiller 2016-02-17 13:37:08 -05:00
parent 25931d3624
commit 31437c9b43
8 changed files with 176 additions and 19 deletions

View File

@ -176,6 +176,9 @@ Bug Fixes
* SOLR-8683: Always consume the full request on the server, not just in the case of an error.
(Mark Miller)
* SOLR-8416: The collections create API should return after all replicas are active.
(Michael Sun, Mark Miller, Alexey Serba)
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

View File

@ -75,6 +75,8 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.handler.admin.ClusterStatus;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
@ -1885,6 +1887,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
final String collectionName = message.getStr(NAME);
log.info("Create collection {}", collectionName);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
@ -1995,7 +1998,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
collectionName, shardNames, repFactor));
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
for (Map.Entry<Position, String> e : positionVsNodes.entrySet()) {
Position position = e.getKey();
String nodeName = e.getValue();
@ -2036,6 +2039,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = nodeName;
params.set("qt", adminPath);
sreq.purpose = 1;
sreq.shards = new String[]{baseUrl};

View File

@ -45,8 +45,15 @@ public class CloudConfig {
private final String zkCredentialsProviderClass;
private final String zkACLProviderClass;
private final int createCollectionWaitTimeTillActive;
private final boolean createCollectionCheckLeaderActive;
CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames, int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration, int autoReplicaFailoverWorkLoopDelay, int autoReplicaFailoverBadNodeExpiration, String zkCredentialsProviderClass, String zkACLProviderClass) {
CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames,
int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
int autoReplicaFailoverWorkLoopDelay, int autoReplicaFailoverBadNodeExpiration, String zkCredentialsProviderClass,
String zkACLProviderClass, int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive) {
this.zkHost = zkHost;
this.zkClientTimeout = zkClientTimeout;
this.hostPort = hostPort;
@ -60,6 +67,8 @@ public class CloudConfig {
this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
this.zkCredentialsProviderClass = zkCredentialsProviderClass;
this.zkACLProviderClass = zkACLProviderClass;
this.createCollectionWaitTimeTillActive = createCollectionWaitTimeTillActive;
this.createCollectionCheckLeaderActive = createCollectionCheckLeaderActive;
if (this.hostPort == -1)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'hostPort' must be configured to run SolrCloud");
@ -119,12 +128,22 @@ public class CloudConfig {
return useGenericCoreNames;
}
public int getCreateCollectionWaitTimeTillActive() {
return createCollectionWaitTimeTillActive;
}
public boolean isCreateCollectionCheckLeaderActive() {
return createCollectionCheckLeaderActive;
}
public static class CloudConfigBuilder {
private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
private static final int DEFAULT_LEADER_VOTE_WAIT = 180000; // 3 minutes
private static final int DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT = 180000;
private static final int DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT = 30; // 30 seconds
private static final boolean DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE = false;
// TODO: tune defaults
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 30000;
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY = 10000;
@ -143,6 +162,8 @@ public class CloudConfig {
private int autoReplicaFailoverBadNodeExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION;
private String zkCredentialsProviderClass;
private String zkACLProviderClass;
private int createCollectionWaitTimeTillActive = DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT;
private boolean createCollectionCheckLeaderActive = DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE;
public CloudConfigBuilder(String hostName, int hostPort) {
this(hostName, hostPort, null);
@ -204,8 +225,21 @@ public class CloudConfig {
return this;
}
public CloudConfigBuilder setCreateCollectionWaitTimeTillActive(int createCollectionWaitTimeTillActive) {
this.createCollectionWaitTimeTillActive = createCollectionWaitTimeTillActive;
return this;
}
public CloudConfigBuilder setCreateCollectionCheckLeaderActive(boolean createCollectionCheckLeaderActive) {
this.createCollectionCheckLeaderActive = createCollectionCheckLeaderActive;
return this;
}
public CloudConfig build() {
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait, leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, autoReplicaFailoverWorkLoopDelay, autoReplicaFailoverBadNodeExpiration, zkCredentialsProviderClass, zkACLProviderClass);
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, autoReplicaFailoverWorkLoopDelay,
autoReplicaFailoverBadNodeExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
createCollectionCheckLeaderActive);
}
}
}

View File

@ -368,6 +368,12 @@ public class SolrXmlConfig {
case "zkCredentialsProvider":
builder.setZkCredentialsProviderClass(value);
break;
case "createCollectionWaitTimeTillActive":
builder.setCreateCollectionWaitTimeTillActive(parseInt(name, value));
break;
case "createCollectionCheckLeaderActive":
builder.setCreateCollectionCheckLeaderActive(Boolean.parseBoolean(value));
break;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown configuration parameter in <solrcloud> section of solr.xml: " + name);
}

View File

@ -54,6 +54,7 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -101,6 +102,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.BlobHandler;
import org.apache.solr.handler.RequestHandlerBase;
@ -178,8 +180,9 @@ public class CollectionsHandler extends RequestHandlerBase {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
CollectionOperation operation = CollectionOperation.get(action);
log.info("Invoked Collection Action :{} with params {} ", action.toLower(), req.getParamString());
log.info("Invoked Collection Action :{} with params {} and sendToOCPQueue={}", action.toLower(), req.getParamString(), operation.sendToOCPQueue);
SolrResponse response = null;
Map<String, Object> props = operation.call(req, rsp, this);
String asyncId = req.getParams().get(ASYNC);
if (props != null) {
@ -188,8 +191,16 @@ public class CollectionsHandler extends RequestHandlerBase {
}
props.put(QUEUE_OPERATION, operation.action.toLower());
ZkNodeProps zkProps = new ZkNodeProps(props);
if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut);
if (operation.sendToOCPQueue) {
response = handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut);
}
else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
final String collectionName = zkProps.getStr(NAME);
if (action.equals(CollectionAction.CREATE) && asyncId == null) {
if (rsp.getException() == null) {
waitForActiveCollection(collectionName, zkProps, cores, response);
}
}
}
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
@ -202,18 +213,18 @@ public class CollectionsHandler extends RequestHandlerBase {
static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
handleResponse(operation, m, rsp, DEFAULT_COLLECTION_OP_TIMEOUT);
}
private void handleResponse(String operation, ZkNodeProps m,
private SolrResponse handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
if (m.containsKey(ASYNC) && m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC);
@ -238,7 +249,7 @@ public class CollectionsHandler extends RequestHandlerBase {
rsp.getValues().addAll(response.getResponse());
return;
return response;
}
QueueEvent event = coreContainer.getZkController()
@ -252,6 +263,7 @@ public class CollectionsHandler extends RequestHandlerBase {
Integer code = (Integer) exp.get("rspCode");
rsp.setException(new SolrException(code != null && code != -1 ? ErrorCode.getErrorCode(code) : ErrorCode.SERVER_ERROR, (String)exp.get("msg")));
}
return response;
} else {
if (System.nanoTime() - time >= TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
throw new SolrException(ErrorCode.SERVER_ERROR, operation
@ -419,7 +431,7 @@ public class CollectionsHandler extends RequestHandlerBase {
}
},
SPLITSHARD_OP(SPLITSHARD, DEFAULT_ZK_TIMEOUT * 5, true) {
SPLITSHARD_OP(SPLITSHARD, DEFAULT_COLLECTION_OP_TIMEOUT * 5, true) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
throws Exception {
@ -725,7 +737,7 @@ public class CollectionsHandler extends RequestHandlerBase {
return null;
}
},
MODIFYCOLLECTION_OP(MODIFYCOLLECTION, DEFAULT_ZK_TIMEOUT, false) {
MODIFYCOLLECTION_OP(MODIFYCOLLECTION, DEFAULT_COLLECTION_OP_TIMEOUT, false) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
@ -752,7 +764,7 @@ public class CollectionsHandler extends RequestHandlerBase {
boolean sendToOCPQueue;
CollectionOperation(CollectionAction action) {
this(action, DEFAULT_ZK_TIMEOUT, true);
this(action, DEFAULT_COLLECTION_OP_TIMEOUT, true);
}
CollectionOperation(CollectionAction action, long timeOut, boolean sendToOCPQueue) {
@ -851,6 +863,72 @@ public class CollectionsHandler extends RequestHandlerBase {
}
}
private static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
throws KeeperException, InterruptedException {
if (response.getResponse().get("exception") != null) {
// the main called failed, don't wait
log.info("Not waiting for active collection due to exception: " + response.getResponse().get("exception"));
return;
}
if (response.getResponse().get("failure") != null) {
// TODO: we should not wait for Replicas we know failed
}
String replicaNotAlive = null;
String replicaState = null;
String nodeNotLive = null;
CloudConfig ccfg = cc.getConfig().getCloudConfig();
Integer numRetries = ccfg.getCreateCollectionWaitTimeTillActive();
Boolean checkLeaderOnly = ccfg.isCreateCollectionCheckLeaderActive();
log.info("Wait for new collection to be active for at most " + numRetries + " seconds. Check all shard "
+ (checkLeaderOnly ? "leaders" : "replicas"));
ZkStateReader zkStateReader = cc.getZkController().getZkStateReader();
for (int i = 0; i < numRetries; i++) {
zkStateReader.updateClusterState();
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> shards = clusterState.getSlices(collectionName);
if (shards != null) {
replicaNotAlive = null;
for (Slice shard : shards) {
Collection<Replica> replicas;
if (!checkLeaderOnly) replicas = shard.getReplicas();
else {
replicas = new ArrayList<Replica>();
replicas.add(shard.getLeader());
}
for (Replica replica : replicas) {
String state = replica.getStr(ZkStateReader.STATE_PROP);
log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
replica.getCoreUrl(), state);
if (!clusterState.liveNodesContain(replica.getNodeName())
|| !state.equals(Replica.State.ACTIVE.toString())) {
replicaNotAlive = replica.getCoreUrl();
nodeNotLive = replica.getNodeName();
replicaState = state;
break;
}
}
if (replicaNotAlive != null) break;
}
if (replicaNotAlive == null) return;
}
Thread.sleep(1000);
}
if (nodeNotLive != null && replicaState != null) {
log.error("Timed out waiting for new collection's replicas to become ACTIVE "
+ (replicaState.equals(Replica.State.ACTIVE.toString()) ? "node " + nodeNotLive + " is not live"
: "replica " + replicaNotAlive + " is in state of " + replicaState.toString()) + " with timeout=" + numRetries);
} else {
log.error("Timed out waiting for new collection's replicas to become ACTIVE with timeout=" + numRetries);
}
}
public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
List l = (List) m.get(RULE);
if (l != null) {

View File

@ -110,6 +110,8 @@ enum CoreAdminOperation {
CREATE_OP(CREATE) {
@Override
public void call(CallInfo callInfo) {
assert TestInjection.injectRandomDelayInCoreCreation();
SolrParams params = callInfo.req.getParams();
log.info("core create command {}", params);
String coreName = params.required().get(CoreAdminParams.NAME);

View File

@ -19,7 +19,6 @@ package org.apache.solr.util;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
@ -72,12 +71,16 @@ public class TestInjection {
public static String failReplicaRequests = null;
public static String failUpdateRequests = null;
public static String nonExistentCoreExceptionAfterUnload = null;
public static String updateLogReplayRandomPause = null;
public static String updateRandomPause = null;
public static String randomDelayInCoreCreation = null;
public static int randomDelayMaxInCoreCreationInSec = 10;
private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
@ -88,12 +91,31 @@ public class TestInjection {
nonExistentCoreExceptionAfterUnload = null;
updateLogReplayRandomPause = null;
updateRandomPause = null;
randomDelayInCoreCreation = null;
for (Timer timer : timers) {
timer.cancel();
}
}
public static boolean injectRandomDelayInCoreCreation() {
if (randomDelayInCoreCreation != null) {
Pair<Boolean,Integer> pair = parseValue(randomDelayInCoreCreation);
boolean enabled = pair.getKey();
int chanceIn100 = pair.getValue();
if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) {
int delay = RANDOM.nextInt(randomDelayMaxInCoreCreationInSec);
log.info("Inject random core creation delay of {}s", delay);
try {
Thread.sleep(delay * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return true;
}
public static boolean injectNonGracefullClose(CoreContainer cc) {
if (cc.isShutDown() && nonGracefullClose != null) {
Pair<Boolean,Integer> pair = parseValue(nonGracefullClose);

View File

@ -70,6 +70,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,6 +94,11 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
// we randomly use a second config set rather than just one
private boolean secondConfigSet = random().nextBoolean();
@BeforeClass
public static void beforeCollectionsAPIDistributedZkTest() {
TestInjection.randomDelayInCoreCreation = "true:20";
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
@ -155,6 +161,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
waitForRecoveriesToFinish(false); // we need to fix no core tests still
testNodesUsedByCreate();
testNoConfigSetExist();
testCollectionsAPI();
@ -1273,8 +1280,9 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
MAX_SHARDS_PER_NODE, maxShardsPerNode,
NUM_SLICES, numShards);
Map<String,List<Integer>> collectionInfos = new HashMap<>();
createCollection(collectionInfos, COLL_NAME, props, client,"conf1");
waitForRecoveriesToFinish(COLL_NAME, false);
createCollection(collectionInfos, COLL_NAME, props, client, "conf1");
assertAllActive(COLL_NAME, getCommonCloudSolrClient().getZkStateReader());
}
private void clusterPropTest() throws Exception {