SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (#2318)

This commit is contained in:
Ishan Chattopadhyaya 2021-02-13 01:10:35 +05:30
parent f7e42bdb35
commit 4b113067d8
8 changed files with 188 additions and 50 deletions

View File

@ -255,6 +255,9 @@ Bug Fixes
* SOLR-15136: Reduce excessive logging introduced with Per Replica States feature (Ishan Chattopadhyaya)
* SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections
(Mike Drob, Ilan Ginzburg, noble, Ishan Chattopadhyaya)
================== 8.8.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -16,8 +16,6 @@
*/
package org.apache.solr.cloud;
import static org.apache.solr.common.params.CommonParams.ID;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@ -29,8 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import com.codahale.metrics.Timer;
import org.apache.lucene.util.Version;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@ -78,7 +78,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
import static org.apache.solr.common.params.CommonParams.ID;
/**
* <p>Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
@ -145,6 +145,7 @@ public class Overseer implements SolrCloseable {
public static final int NUM_RESPONSES_TO_STORE = 10000;
public static final String OVERSEER_ELECT = "/overseer_elect";
private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
private SolrMetricsContext solrMetricsContext;
private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@ -254,8 +255,6 @@ public class Overseer implements SolrCloseable {
if (log.isDebugEnabled()) {
log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getZkStats().getQueueLength(), message);
}
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
try {
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
} catch (Exception e) {
@ -316,6 +315,13 @@ public class Overseer implements SolrCloseable {
processedNodes.add(head.first());
fallbackQueueSize = processedNodes.size();
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
while (unprocessedMessages.size() > 0) {
clusterState = zkStateWriter.writePendingUpdates();
Message m = unprocessedMessages.remove(0);
clusterState = m.run(clusterState, Overseer.this);
}
// The callback always be called on this thread
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
stateUpdateQueue.remove(processedNodes);
@ -1064,4 +1070,17 @@ public class Overseer implements SolrCloseable {
getStateUpdateQueue().offer(data);
}
/**
* Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
* loop runs next time
*/
public void submit(Message message) {
unprocessedMessages.add(message);
}
public interface Message {
ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.data.Stat;
/**
* Refresh the Cluster State for a given collection
*
*/
public class RefreshCollectionMessage implements Overseer.Message {
public final String collection;
public RefreshCollectionMessage(String collection) {
this.collection = collection;
}
public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
if (stat == null) {
//collection does not exist
return clusterState.copyWith(collection, null);
}
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
//our state is up to date
return clusterState;
} else {
coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
return clusterState.copyWith(collection, coll);
}
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
@ -38,22 +39,15 @@ import org.apache.solr.client.solrj.cloud.NotEmptyException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.VersionedData;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.RefreshCollectionMessage;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
@ -109,6 +103,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
@ -128,8 +123,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// fail fast if parameters are wrong or incomplete
List<String> shardNames = populateShardNames(message, router);
checkReplicaTypes(message);
DocCollection newColl = null;
final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
try {
@ -150,6 +145,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
createCollectionZkNode(stateManager, collectionName, collectionParams);
if (isPRS) {
// In case of a PRS collection, create the collection structure directly instead of resubmitting
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
// wait for a while until we see the collection
@ -166,10 +171,14 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// refresh cluster state
clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
newColl = clusterState.getCollection(collectionName);
}
List<ReplicaPosition> replicaPositions = null;
try {
replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName),
replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, newColl,
message, shardNames);
} catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
@ -213,7 +222,19 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
if (isPRS) {
// In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
// log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
@ -246,7 +267,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
// wait for all replica entries to be created
Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
Map<String, Replica> replicas ;
if (isPRS) {
replicas = new ConcurrentHashMap<>();
newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coresToCreate.containsKey(r.getCoreName())) // Only the elements that were asked for...
.forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
} else {
replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
}
for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
ShardRequest sreq = e.getValue();
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
@ -256,6 +286,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
if(isPRS) {
TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
while (!timeout.hasTimedOut()) {
if(prs.allActive()) break;
Thread.sleep(100);
prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
}
if (prs.allActive()) {
// we have successfully found all replicas to be ACTIVE
} else {
failure = true;
}
// Now ask Overseer to fetch the latest state of collection
// from ZK
ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
}
if (failure) {
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
@ -265,7 +312,6 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
throw new SolrException(ErrorCode.BAD_REQUEST, "Underlying core creation failed while creating collection: " + collectionName);
} else {
log.debug("Finished create command on all shards for collection: {}", collectionName);
// Emit a warning about production use of data driven functionality
boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null ||
message.getStr(COLL_CONF).equals(DEFAULT_CONFIGSET_NAME);

View File

@ -100,7 +100,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
assertThat(CollectionAdminRequest.listCollections(cloudClient), not(hasItem(collectionName)));
// Create a collection that would fail
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,"conf1",1,1);
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,"conf1",1,1).setPerReplicaState(random().nextBoolean());
Properties properties = new Properties();
Path tmpDir = createTempDir();

View File

@ -67,6 +67,8 @@ public class PerReplicaStates implements ReflectMapWriter {
@JsonProperty
public final SimpleMap<State> states;
private volatile Boolean allActive;
/**
* Construct with data read from ZK
* @param path path from where this is loaded
@ -92,6 +94,17 @@ public class PerReplicaStates implements ReflectMapWriter {
}
/** Check and return if all replicas are ACTIVE
*/
public boolean allActive() {
if (this.allActive != null) return allActive;
boolean[] result = new boolean[]{true};
states.forEachEntry((r, s) -> {
if (s.state != Replica.State.ACTIVE) result[0] = false;
});
return this.allActive = result[0];
}
/**Get the changed replicas
*/
public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {

View File

@ -69,9 +69,7 @@ public class PerReplicaStatesOps {
try {
zkClient.multi(ops, true);
} catch (KeeperException e) {
if(log.isErrorEnabled()) {
log.error("multi op exception " , e);
}
log.error("Multi-op exception: {}", zkClient.getChildren(znode, null, true));
throw e;
}
@ -119,7 +117,7 @@ public class PerReplicaStatesOps {
public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
PerReplicaStates.State existing = rs.get(replica);
PerReplicaStates.State existing = prs.get(replica);
if (existing == null) {
operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
} else {
@ -127,7 +125,7 @@ public class PerReplicaStatesOps {
addDeleteStaleNodes(operations, existing);
}
if (log.isDebugEnabled()) {
log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
log.debug("flipState on {}, {} -> {}, ops :{}", prs.path, replica, newState, operations);
}
return operations;
}).init(rs);
@ -163,7 +161,7 @@ public class PerReplicaStatesOps {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> ops = new ArrayList<>();
if (next != null) {
PerReplicaStates.State st = rs.get(next);
PerReplicaStates.State st = prs.get(next);
if (st != null) {
if (!st.isLeader) {
ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
@ -179,7 +177,7 @@ public class PerReplicaStatesOps {
// now go through all other replicas and unset previous leader
for (String r : allReplicas) {
PerReplicaStates.State st = rs.get(r);
PerReplicaStates.State st = prs.get(r);
if (st == null) continue;//unlikely
if (!Objects.equals(r, next)) {
if (st.isLeader) {
@ -190,7 +188,7 @@ public class PerReplicaStatesOps {
}
}
if (log.isDebugEnabled()) {
log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
log.debug("flipLeader on:{}, {} -> {}, ops: {}", prs.path, allReplicas, next, ops);
}
return ops;
}).init(rs);
@ -204,10 +202,10 @@ public class PerReplicaStatesOps {
public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> result;
if (rs == null) {
if (prs == null) {
result = Collections.emptyList();
} else {
PerReplicaStates.State state = rs.get(replica);
PerReplicaStates.State state = prs.get(replica);
result = addDeleteStaleNodes(new ArrayList<>(), state);
}
return result;
@ -226,7 +224,7 @@ public class PerReplicaStatesOps {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> operations = new ArrayList<>();
for (String replica : replicas) {
PerReplicaStates.State r = rs.get(replica);
PerReplicaStates.State r = prs.get(replica);
if (r != null) {
if (r.state == Replica.State.DOWN && !r.isLeader) continue;
operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
@ -236,7 +234,7 @@ public class PerReplicaStatesOps {
}
}
if (log.isDebugEnabled()) {
log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
log.debug("for coll: {} down replicas {}, ops {}", prs, replicas, operations);
}
return operations;
}).init(rs);

View File

@ -1073,6 +1073,14 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
// Now let's do an add replica
CollectionAdminRequest
.addReplicaToShard(testCollection, "shard1")
.process(cluster.getSolrClient());
prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(5, prs.states.size());
testCollection = "perReplicaState_testv2";
new V2Request.Builder("/collections")
.withMethod(POST)