mirror of https://github.com/apache/lucene.git
SOLR-12729: SplitShardCmd should lock the parent shard to prevent parallel splitting requests.
This commit is contained in:
@ -182,6 +182,8 @@ Bug Fixes
* SOLR-12836: ZkController creates a cloud solr client with no connection or read timeouts. Now the http client
created by the update shard handler is used instead. (shalin)
* SOLR-12729: SplitShardCmd should lock the parent shard to prevent parallel splitting requests. (ab)
@ -61,6 +61,7 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@ -116,7 +117,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
if (parentSlice.getState() != Slice.State.ACTIVE) {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Parent slice is not active: " + parentSlice.getState());
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Parent slice is not active: " +
collectionName + "/ " + parentSlice.getName() + ", state=" + parentSlice.getState());
// find the leader for the shard
@ -172,6 +174,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// check for the lock
if (!lockForSplit(ocmh.cloudManager, collectionName, parentSlice.getName())) {
// mark as success to avoid clearing the lock in the "finally" block
success = true;
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Can't lock parent slice for splitting (another split operation running?): " +
collectionName + "/" + parentSlice.getName());
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
t = timings.sub("fillRanges");
@ -502,6 +512,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
results.add(CommonParams.TIMING, timings.asNamedList());
success = true;
// don't unlock the shard yet - only do this if the final switch-over in
// ReplicaMutator succeeds (or fails)
return true;
} catch (SolrException e) {
throw e;
@ -512,6 +524,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (sessionWrapper != null) sessionWrapper.release();
if (!success) {
cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices, offlineSlices);
unlockForSplit(ocmh.cloudManager, collectionName, parentSlice.getName());
@ -740,4 +753,26 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
return rangesStr;
public static boolean lockForSplit(SolrCloudManager cloudManager, String collection, String shard) throws Exception {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + "-splitting";
if (cloudManager.getDistribStateManager().hasData(path)) {
return false;
Map<String, Object> map = new HashMap<>();
map.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
byte[] data = Utils.toJSON(map);
try {
cloudManager.getDistribStateManager().makePath(path, data, CreateMode.EPHEMERAL, true);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Can't lock parent slice for splitting (another split operation running?): " +
collection + "/" + shard, e);
return true;
public static void unlockForSplit(SolrCloudManager cloudManager, String collection, String shard) throws Exception {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + "-splitting";
cloudManager.getDistribStateManager().removeRecursively(path, true, true);
@ -21,7 +21,9 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@ -29,6 +31,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +77,7 @@ public class InactiveShardPlanAction extends TriggerActionBase {
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
Map<String, List<String>> cleanup = new LinkedHashMap<>();
Map<String, List<String>> inactive = new LinkedHashMap<>();
Map<String, Map<String, Object>> staleLocks = new LinkedHashMap<>();
state.forEachCollection(coll ->
coll.getSlices().forEach(s -> {
if (Slice.State.INACTIVE.equals(s.getState())) {
@ -94,12 +98,54 @@ public class InactiveShardPlanAction extends TriggerActionBase {
cleanup.computeIfAbsent(coll.getName(), c -> new ArrayList<>()).add(s.getName());
// check for stale shard split locks
String parentPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll.getName();
List<String> locks;
try {
locks = cloudManager.getDistribStateManager().listData(parentPath).stream()
.filter(name -> name.endsWith("-splitting"))
for (String lock : locks) {
try {
String lockPath = parentPath + "/" + lock;
Map<String, Object> lockData = Utils.getJson(cloudManager.getDistribStateManager(), lockPath);
String tstampStr = (String)lockData.get(ZkStateReader.STATE_TIMESTAMP_PROP);
if (tstampStr == null || tstampStr.isEmpty()) {
long timestamp = Long.parseLong(tstampStr);
// this timestamp uses epoch time
long currentTime = cloudManager.getTimeSource().getEpochTimeNs();
long delta = TimeUnit.NANOSECONDS.toSeconds(currentTime - timestamp);
log.debug("{}/{}: locktstamp={}, time={}, delta={}", coll.getName(), lock, timestamp, currentTime, delta);
if (delta > cleanupTTL) {
log.debug("-- delete inactive split lock for {}/{}, delta={}", coll.getName(), lock, delta);
cloudManager.getDistribStateManager().removeData(lockPath, -1);
lockData.put("currentTimeNs", currentTime);
lockData.put("deltaSec", delta);
lockData.put("ttlSec", cleanupTTL);
staleLocks.put(coll.getName() + "/" + lock, lockData);
} else {
log.debug("-- lock " + coll.getName() + "/" + lock + " still active (delta=" + delta + ")");
} catch (NoSuchElementException nse) {
// already removed by someone else - ignore
} catch (Exception e) {
log.warn("Exception checking for inactive shard split locks in " + parentPath, e);
if (!cleanup.isEmpty()) {
Map<String, Object> results = new LinkedHashMap<>();
if (!cleanup.isEmpty()) {
results.put("inactive", inactive);
results.put("cleanup", cleanup);
if (!staleLocks.isEmpty()) {
results.put("staleLocks", staleLocks);
if (!results.isEmpty()) {
context.getProperties().put(getName(), results);
@ -35,6 +35,7 @@ import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -476,6 +478,13 @@ public class ReplicaMutator {
propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
try {
SplitShardCmd.unlockForSplit(cloudManager, collection.getName(), parentSliceName);
} catch (Exception e) {
log.warn("Failed to unlock shard after " + (isLeaderSame ? "" : "un") + "successful split: {} / {}",
collection.getName(), parentSliceName);
ZkNodeProps m = new ZkNodeProps(propMap);
return new SliceMutator(cloudManager).updateShardState(prevState, m).collection;
@ -49,12 +49,12 @@ public class SliceMutator {
public static final Set<String> SLICE_UNIQUE_BOOLEAN_PROPERTIES = ImmutableSet.of(PREFERRED_LEADER_PROP);
protected final SolrCloudManager dataProvider;
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
public SliceMutator(SolrCloudManager dataProvider) {
this.dataProvider = dataProvider;
this.stateManager = dataProvider.getDistribStateManager();
public SliceMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@ -140,9 +140,9 @@ public class SliceMutator {
String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(dataProvider).unsetLeader(replica);
replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
} else if (coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(dataProvider).setLeader(replica);
replica = new ReplicaMutator(cloudManager).setLeader(replica);
newReplicas.put(replica.getName(), replica);
@ -179,7 +179,7 @@ public class SliceMutator {
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
// we need to use epoch time so that it's comparable across Overseer restarts
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(dataProvider.getTimeSource().getEpochTimeNs()));
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
slicesCopy.put(slice.getName(), newSlice);
@ -24,6 +24,8 @@ import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -135,6 +137,8 @@ public class TestInjection {
public static String splitFailureAfterReplicaCreation = null;
public static CountDownLatch splitLatch = null;
public static String waitForReplicasInSync = "true:60";
public static String failIndexFingerprintRequests = null;
@ -159,6 +163,7 @@ public class TestInjection {
randomDelayInCoreCreation = null;
splitFailureBeforeReplicaCreation = null;
splitFailureAfterReplicaCreation = null;
splitLatch = null;
prepRecoveryOpPauseForever = null;
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
waitForReplicasInSync = "true:60";
@ -413,6 +418,18 @@ public class TestInjection {
return injectSplitFailure(splitFailureAfterReplicaCreation, "after creating replica for sub-shard");
public static boolean injectSplitLatch() {
if (splitLatch != null) {
try {
log.info("Waiting in ReplicaMutator for up to 60s");
return splitLatch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return true;
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) {
if (waitForReplicasInSync == null) return true;
@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
@ -60,10 +61,12 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
@ -540,6 +543,64 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
public void testSplitLocking() throws Exception {
String collectionName = "testSplitLocking";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
waitForRecoveriesToFinish(collectionName, false);
TestInjection.splitLatch = new CountDownLatch(1); // simulate a long split operation
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/" + SHARD1 + "-splitting";
final AtomicReference<Exception> exc = new AtomicReference<>();
try {
Runnable r = () -> {
try {
trySplit(collectionName, null, SHARD1, 1);
} catch (Exception e) {
Thread t = new Thread(r);
// wait for the split to start executing
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
if (cloudClient.getZkStateReader().getZkClient().exists(path, true)) {
log.info("=== found lock node");
assertFalse("timed out waiting for the lock znode to appear", timeOut.hasTimedOut());
assertNull("unexpected exception: " + exc.get(), exc.get());
log.info("=== trying second split");
try {
trySplit(collectionName, null, SHARD1, 1);
fail("expected to fail due to locking but succeeded");
} catch (Exception e) {
log.info("Expected failure: " + e.toString());
// make sure the lock still exists
assertTrue("lock znode expected but missing", cloudClient.getZkStateReader().getZkClient().exists(path, true));
// let the first split proceed
timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
if (!cloudClient.getZkStateReader().getZkClient().exists(path, true)) {
assertFalse("timed out waiting for the lock znode to disappear", timeOut.hasTimedOut());
} finally {
public void testSplitShardWithRule() throws Exception {
@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -35,10 +36,13 @@ import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -167,6 +171,17 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
CloudTestUtils.waitForState(cloudManager, "failed to create " + collection1, collection1,
CloudTestUtils.clusterShape(1, 1));
// also create a very stale lock
Map<String, Object> lockData = new HashMap<>();
lockData.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs() -
TimeUnit.NANOSECONDS.convert(48, TimeUnit.HOURS)));
String staleLockName = collection1 + "/staleShard-splitting";
cloudManager.getDistribStateManager().makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" +
staleLockName, Utils.toJSON(lockData), CreateMode.EPHEMERAL, true);
// expect two events - one for a very stale lock, one for the cleanup
triggerFired = new CountDownLatch(2);
String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
@ -186,10 +201,10 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
"'set-trigger' : {" +
"'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
"'event' : 'scheduled'," +
"'startTime' : 'NOW+3SECONDS'," +
"'startTime' : 'NOW+10SECONDS'," +
"'every' : '+2SECONDS'," +
"'enabled' : true," +
"'actions' : [{'name' : 'inactive_shard_plan', 'class' : 'solr.InactiveShardPlanAction', 'ttl' : '10'}," +
"'actions' : [{'name' : 'inactive_shard_plan', 'class' : 'solr.InactiveShardPlanAction', 'ttl' : '20'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}," +
"{'name' : 'test', 'class' : '" + TestTriggerAction.class.getName() + "'}]" +
@ -208,7 +223,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
CloudTestUtils.clusterShape(3, 1, true, true));
await = triggerFired.await(60, TimeUnit.SECONDS);
await = triggerFired.await(90, TimeUnit.SECONDS);
assertTrue("cleanup action didn't run", await);
// cleanup should have occurred
@ -217,21 +232,27 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
int inactiveEvents = 0;
CapturedEvent ce = null;
CapturedEvent staleLock = null;
for (CapturedEvent e : events) {
if (e.stage != TriggerEventProcessorStage.AFTER_ACTION) {
if (e.context.containsKey("properties.inactive_shard_plan")) {
Map<String, Object> plan = (Map<String, Object>)e.context.get("properties.inactive_shard_plan");
if (plan == null) {
if (plan.containsKey("cleanup")) {
ce = e;
} else {
// capture only the first
if (plan.containsKey("staleLocks") && staleLock == null) {
staleLock = e;
assertTrue("should be at least one inactive event", inactiveEvents > 0);
assertNotNull("missing cleanup event", ce);
assertNotNull("missing cleanup event: " + events, ce);
assertNotNull("missing staleLocks event: " + events, staleLock);
Map<String, Object> map = (Map<String, Object>)ce.context.get("properties.inactive_shard_plan");
@ -242,6 +263,12 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
assertEquals(1, cleanup.size());
map = (Map<String, Object>)staleLock.context.get("properties.inactive_shard_plan");
Map<String, Map<String, Object>> locks = (Map<String, Map<String, Object>>)map.get("staleLocks");
assertTrue("missing stale lock data: " + locks + "\nevents: " + events, locks.containsKey(staleLockName));
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
CloudTestUtils.clusterShape(2, 1).matches(state.getLiveNodes(), state.getCollection(collection1));
@ -1151,12 +1151,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
" / " + sliceName.get() + " has no leader and can't be split");
SplitShardCmd.lockForSplit(cloudManager, collectionName, sliceName.get());
// start counting buffered updates
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
if (props.containsKey(BUFFERED_UPDATES)) {
log.debug("--- SOLR-12729: Overlapping splitShard commands for {} / {}", collectionName, sliceName.get());
throw new Exception("--- SOLR-12729: Overlapping splitShard commands for " + collectionName + "/" + sliceName.get());
props.put(BUFFERED_UPDATES, new AtomicLong());
@ -1240,6 +1240,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// delay it once again to better simulate replica recoveries
//opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
boolean success = false;
try {
CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
for (String subSlice : subSlices) {
Slice s = state.getSlice(subSlice);
@ -1254,6 +1256,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return true;
success = true;
} finally {
if (!success) {
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
// mark the new slices as active and the old slice as inactive
log.trace("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
sliceName.get(), subSlices);
@ -1292,6 +1300,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// invalidate cached state
} finally {
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
results.add("success", "");
@ -66,6 +66,9 @@ public class TestSimExtremeIndexing extends SimSolrCloudTestCase {
// tweak this threshold to test the number of splits
private static final long ABOVE_SIZE = 20000000;
// tweak this to allow more operations in one event
private static final int MAX_OPS = 100;
private static TimeSource timeSource;
private static SolrClient solrClient;
@ -100,6 +103,7 @@ public class TestSimExtremeIndexing extends SimSolrCloudTestCase {
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'aboveDocs' : " + ABOVE_SIZE + "," +
"'maxOps' : " + MAX_OPS + "," +
"'enabled' : true," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
Reference in New Issue