mirror of https://github.com/apache/lucene.git
SOLR-12730: Implement staggered SPLITSHARD requests in IndexSizeTrigger.
This commit is contained in:
parent
bfbe80472e
commit
a5972cedf8
|
@ -98,6 +98,8 @@ New Features
|
|||
|
||||
* SOLR-12791: Add Metrics reporting for AuthenticationPlugin (janhoy)
|
||||
|
||||
* SOLR-12730: Implement staggered SPLITSHARD requests in IndexSizeTrigger. (ab)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -26,15 +26,18 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
|
@ -682,6 +685,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
boolean firstReplicaNrt) {
|
||||
String splitKey = message.getStr("split.key");
|
||||
String rangesStr = message.getStr(CoreAdminParams.RANGES);
|
||||
String fuzzStr = message.getStr(CommonAdminParams.SPLIT_FUZZ, "0");
|
||||
float fuzz = 0.0f;
|
||||
try {
|
||||
fuzz = Float.parseFloat(fuzzStr);
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid numeric value of 'fuzz': " + fuzzStr);
|
||||
}
|
||||
|
||||
DocRouter.Range range = parentSlice.getRange();
|
||||
if (range == null) {
|
||||
|
@ -748,7 +758,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"A shard can only be split into "+MIN_NUM_SUB_SHARDS+" to " + MAX_NUM_SUB_SHARDS
|
||||
+ " subshards in one split request. Provided "+NUM_SUB_SHARDS+"=" + numSubShards);
|
||||
subRanges.addAll(router.partitionRange(numSubShards, range));
|
||||
subRanges.addAll(router.partitionRange(numSubShards, range, fuzz));
|
||||
}
|
||||
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
|
@ -763,23 +773,45 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
|
||||
public static boolean lockForSplit(SolrCloudManager cloudManager, String collection, String shard) throws Exception {
|
||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + "-splitting";
|
||||
if (cloudManager.getDistribStateManager().hasData(path)) {
|
||||
return false;
|
||||
final DistribStateManager stateManager = cloudManager.getDistribStateManager();
|
||||
synchronized (stateManager) {
|
||||
if (stateManager.hasData(path)) {
|
||||
VersionedData vd = stateManager.getData(path);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
byte[] data = Utils.toJSON(map);
|
||||
try {
|
||||
cloudManager.getDistribStateManager().makePath(path, data, CreateMode.EPHEMERAL, true);
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Can't lock parent slice for splitting (another split operation running?): " +
|
||||
collection + "/" + shard, e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
byte[] data = Utils.toJSON(map);
|
||||
try {
|
||||
cloudManager.getDistribStateManager().makePath(path, data, CreateMode.EPHEMERAL, true);
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Can't lock parent slice for splitting (another split operation running?): " +
|
||||
collection + "/" + shard, e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static void unlockForSplit(SolrCloudManager cloudManager, String collection, String shard) throws Exception {
|
||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + "-splitting";
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, true);
|
||||
if (shard != null) {
|
||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/" + shard + "-splitting";
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, true);
|
||||
} else {
|
||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
|
||||
try {
|
||||
List<String> names = cloudManager.getDistribStateManager().listData(path);
|
||||
for (String name : cloudManager.getDistribStateManager().listData(path)) {
|
||||
if (name.endsWith("-splitting")) {
|
||||
try {
|
||||
cloudManager.getDistribStateManager().removeData(path + "/" + name, -1);
|
||||
} catch (NoSuchElementException nse) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (NoSuchElementException nse) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
public static final String BELOW_OP_PROP = "belowOp";
|
||||
public static final String COLLECTIONS_PROP = "collections";
|
||||
public static final String MAX_OPS_PROP = "maxOps";
|
||||
public static final String SPLIT_FUZZ_PROP = CommonAdminParams.SPLIT_FUZZ;
|
||||
public static final String SPLIT_METHOD_PROP = CommonAdminParams.SPLIT_METHOD;
|
||||
|
||||
public static final String BYTES_SIZE_PROP = "__bytes__";
|
||||
|
@ -80,6 +81,7 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
|
||||
private int maxOps;
|
||||
private SolrIndexSplitter.SplitMethod splitMethod;
|
||||
private float splitFuzz;
|
||||
private CollectionParams.CollectionAction aboveOp, belowOp;
|
||||
private final Set<String> collections = new HashSet<>();
|
||||
private final Map<String, Long> lastAboveEventMap = new ConcurrentHashMap<>();
|
||||
|
@ -89,7 +91,7 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
super(TriggerEventType.INDEXSIZE, name);
|
||||
TriggerUtils.validProperties(validProperties,
|
||||
ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, BELOW_BYTES_PROP, BELOW_DOCS_PROP,
|
||||
COLLECTIONS_PROP, MAX_OPS_PROP, SPLIT_METHOD_PROP);
|
||||
COLLECTIONS_PROP, MAX_OPS_PROP, SPLIT_METHOD_PROP, SPLIT_FUZZ_PROP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,12 +171,18 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), MAX_OPS_PROP, "invalid value: '" + maxOpsStr + "': " + e.getMessage());
|
||||
}
|
||||
String methodStr = (String)properties.getOrDefault(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
|
||||
String methodStr = (String)properties.getOrDefault(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.LINK.toLower());
|
||||
splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
|
||||
if (splitMethod == null) {
|
||||
throw new TriggerValidationException(getName(), SPLIT_METHOD_PROP, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
|
||||
": " + methodStr);
|
||||
}
|
||||
String fuzzStr = String.valueOf(properties.getOrDefault(SPLIT_FUZZ_PROP, 0.0f));
|
||||
try {
|
||||
splitFuzz = Float.parseFloat(fuzzStr);
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), SPLIT_FUZZ_PROP, "invalid value: '" + fuzzStr + "': " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -399,6 +407,9 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
|
||||
if (splitFuzz > 0) {
|
||||
params.put(CommonAdminParams.SPLIT_FUZZ, splitFuzz);
|
||||
}
|
||||
op.addHint(Suggester.Hint.PARAMS, params);
|
||||
ops.add(op);
|
||||
Long time = lastAboveEventMap.get(r.getCore());
|
||||
|
|
|
@ -145,6 +145,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
|||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.NUM_SUB_SHARDS;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.SPLIT_FUZZ;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
@ -641,6 +642,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
|
||||
String splitKey = req.getParams().get("split.key");
|
||||
String numSubShards = req.getParams().get(NUM_SUB_SHARDS);
|
||||
String fuzz = req.getParams().get(SPLIT_FUZZ);
|
||||
|
||||
if (splitKey == null && shard == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "At least one of shard, or split.key should be specified.");
|
||||
|
@ -657,6 +659,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"numSubShards can not be specified with split.key or ranges parameters");
|
||||
}
|
||||
if (fuzz != null && (splitKey != null || rangesStr != null)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"fuzz can not be specified with split.key or ranges parameters");
|
||||
}
|
||||
|
||||
Map<String, Object> map = copy(req.getParams(), null,
|
||||
COLLECTION_PROP,
|
||||
|
@ -666,7 +672,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
WAIT_FOR_FINAL_STATE,
|
||||
TIMING,
|
||||
SPLIT_METHOD,
|
||||
NUM_SUB_SHARDS);
|
||||
NUM_SUB_SHARDS,
|
||||
SPLIT_FUZZ);
|
||||
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
|
||||
}),
|
||||
DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {
|
||||
|
|
|
@ -174,7 +174,7 @@ public class SolrIndexSplitter {
|
|||
log.error("Original error closing IndexWriter:", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reopening IndexWriter after failed close", e1);
|
||||
}
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing current IndexWriter, aborting offline split...", e);
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing current IndexWriter, aborting 'link' split...", e);
|
||||
}
|
||||
}
|
||||
boolean success = false;
|
||||
|
@ -195,7 +195,7 @@ public class SolrIndexSplitter {
|
|||
t = timings.sub("parentApplyBufferedUpdates");
|
||||
ulog.applyBufferedUpdates();
|
||||
t.stop();
|
||||
log.info("Splitting in 'offline' mode " + (success? "finished" : "FAILED") +
|
||||
log.info("Splitting in 'link' mode " + (success? "finished" : "FAILED") +
|
||||
": re-opened parent IndexWriter.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -235,7 +235,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
// this should always be used - see filterParams
|
||||
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
|
||||
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD);
|
||||
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD,
|
||||
UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS);
|
||||
|
||||
CoreContainer cc = req.getCore().getCoreContainer();
|
||||
|
||||
|
|
|
@ -46,6 +46,6 @@
|
|||
</requestHandler>
|
||||
<indexConfig>
|
||||
<mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
|
||||
</indexConfig>
|
||||
: </indexConfig>
|
||||
</config>
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.solr.client.solrj.SolrServerException;
|
|||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -102,4 +104,33 @@ public class SplitShardTest extends SolrCloudTestCase {
|
|||
assertTrue("Expected SolrException but it didn't happen", expectedException);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitFuzz() throws Exception {
|
||||
String collectionName = "splitFuzzCollection";
|
||||
CollectionAdminRequest
|
||||
.createCollection(collectionName, "conf", 2, 1)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(collectionName, 2, 2);
|
||||
|
||||
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
|
||||
.setSplitFuzz(0.5f)
|
||||
.setShardName("shard1");
|
||||
splitShard.process(cluster.getSolrClient());
|
||||
waitForState("Timed out waiting for sub shards to be active. Number of active shards=" +
|
||||
cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size(),
|
||||
collectionName, activeClusterShape(3, 4));
|
||||
DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
Slice s1_0 = coll.getSlice("shard1_0");
|
||||
Slice s1_1 = coll.getSlice("shard1_1");
|
||||
long fuzz = ((long)Integer.MAX_VALUE >> 3) + 1L;
|
||||
long delta0 = s1_0.getRange().max - s1_0.getRange().min;
|
||||
long delta1 = s1_1.getRange().max - s1_1.getRange().min;
|
||||
long expected0 = (Integer.MAX_VALUE >> 1) + fuzz;
|
||||
long expected1 = (Integer.MAX_VALUE >> 1) - fuzz;
|
||||
assertEquals("wrong range in s1_0", expected0, delta0);
|
||||
assertEquals("wrong range in s1_1", expected1, delta1);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -215,7 +215,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
Map<String, Object> params = (Map<String, Object>)op.getHints().get(Suggester.Hint.PARAMS);
|
||||
assertNotNull("params are null: " + op, params);
|
||||
assertEquals("splitMethod: " + op, "rewrite", params.get(CommonAdminParams.SPLIT_METHOD));
|
||||
assertEquals("splitMethod: " + op, "link", params.get(CommonAdminParams.SPLIT_METHOD));
|
||||
}
|
||||
assertTrue("shard1 should be split", shard1);
|
||||
assertTrue("shard2 should be split", shard2);
|
||||
|
@ -529,7 +529,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
int aboveBytes = maxSize * 2 / 3;
|
||||
|
||||
long waitForSeconds = 3 + random().nextInt(5);
|
||||
// need to wait for recovery after splitting
|
||||
long waitForSeconds = 10 + random().nextInt(5);
|
||||
|
||||
// the trigger is initially disabled so that we have time to add listeners
|
||||
// and have them capture all events once the trigger is enabled
|
||||
|
@ -588,6 +589,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
"'name' : 'index_size_trigger4'" +
|
||||
"}" +
|
||||
"}";
|
||||
log.info("-- resuming trigger");
|
||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
@ -596,6 +598,17 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
log.info("-- suspending trigger");
|
||||
// suspend the trigger to avoid generating more events
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'index_size_trigger4'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
assertEquals(1, listenerEvents.size());
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing4");
|
||||
assertNotNull("'capturing4' events not found", events);
|
||||
|
@ -640,7 +653,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
finished = new CountDownLatch(1);
|
||||
|
||||
// suspend the trigger first so that we can safely delete all docs
|
||||
String suspendTriggerCommand = "{" +
|
||||
suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'index_size_trigger4'" +
|
||||
"}" +
|
||||
|
@ -649,6 +662,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
log.info("-- deleting documents");
|
||||
for (int j = 0; j < 10; j++) {
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.setParam("collection", collectionName);
|
||||
|
@ -657,6 +671,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
}
|
||||
solrClient.request(ureq);
|
||||
}
|
||||
cloudManager.getTimeSource().sleep(5000);
|
||||
// make sure the actual index size is reduced by deletions, otherwise we may still violate aboveBytes
|
||||
UpdateRequest ur = new UpdateRequest();
|
||||
ur.setParam(UpdateParams.COMMIT, "true");
|
||||
|
@ -665,8 +680,28 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
ur.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "1");
|
||||
ur.setParam(UpdateParams.WAIT_SEARCHER, "true");
|
||||
ur.setParam(UpdateParams.OPEN_SEARCHER, "true");
|
||||
log.info("-- requesting optimize / expungeDeletes / commit");
|
||||
solrClient.request(ur, collectionName);
|
||||
|
||||
// wait for the segments to merge to reduce the index size
|
||||
cloudManager.getTimeSource().sleep(50000);
|
||||
|
||||
// add some docs so that every shard gets an update
|
||||
// we can reduce the number of docs here but this also works
|
||||
for (int j = 0; j < 1; j++) {
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.setParam("collection", collectionName);
|
||||
for (int i = 0; i < 98; i++) {
|
||||
ureq.add("id", "id-" + (i * 100) + "-" + j);
|
||||
}
|
||||
solrClient.request(ureq);
|
||||
}
|
||||
|
||||
log.info("-- requesting commit");
|
||||
solrClient.commit(collectionName, true, true);
|
||||
|
||||
// resume the trigger
|
||||
log.info("-- resuming trigger");
|
||||
// resume trigger
|
||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
|
@ -676,6 +711,12 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
log.info("-- suspending trigger");
|
||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
// System.exit(-1);
|
||||
|
||||
assertEquals(1, listenerEvents.size());
|
||||
events = listenerEvents.get("capturing4");
|
||||
assertNotNull("'capturing4' events not found", events);
|
||||
|
|
|
@ -981,6 +981,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
sliceProperties.remove(collection);
|
||||
leaderThrottles.remove(collection);
|
||||
colShardReplicaMap.remove(collection);
|
||||
SplitShardCmd.unlockForSplit(cloudManager, collection, null);
|
||||
|
||||
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
|
||||
|
||||
|
@ -1034,6 +1035,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
|
||||
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
|
||||
});
|
||||
cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.COLLECTIONS_ZKNODE, true, false);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -1287,6 +1289,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
|
||||
sProps.remove(BUFFERED_UPDATES);
|
||||
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,19 +21,13 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -81,30 +75,6 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
super.setUp();
|
||||
}
|
||||
|
||||
protected void removeChildren(String path) throws Exception {
|
||||
|
||||
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
timeOut.waitFor("Timed out waiting to see core4 as leader", () -> { try {
|
||||
cluster.getDistribStateManager().removeRecursively(path, true, false);
|
||||
return true;
|
||||
} catch (NotEmptyException e) {
|
||||
|
||||
} catch (NoSuchElementException e) {
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (KeeperException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (BadVersionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/* Cluster helper methods ************************************/
|
||||
|
||||
/**
|
||||
|
|
|
@ -324,7 +324,7 @@ public class AutoScalingConfig implements MapWriter {
|
|||
empty = jsonMap.isEmpty();
|
||||
}
|
||||
|
||||
private AutoScalingConfig(Policy policy, Map<String, TriggerConfig> triggerConfigs, Map<String,
|
||||
public AutoScalingConfig(Policy policy, Map<String, TriggerConfig> triggerConfigs, Map<String,
|
||||
TriggerListenerConfig> listenerConfigs, Map<String, Object> properties, int zkVersion) {
|
||||
this.policy = policy;
|
||||
this.triggers = triggerConfigs != null ? Collections.unmodifiableMap(new LinkedHashMap<>(triggerConfigs)) : null;
|
||||
|
|
|
@ -47,7 +47,11 @@ class SplitShardSuggester extends Suggester {
|
|||
}
|
||||
Pair<String, String> collShard = shards.iterator().next();
|
||||
Map<String, Object> params = (Map<String, Object>)hints.getOrDefault(Hint.PARAMS, Collections.emptyMap());
|
||||
Float splitFuzz = (Float)params.get(CommonAdminParams.SPLIT_FUZZ);
|
||||
CollectionAdminRequest.SplitShard req = CollectionAdminRequest.splitShard(collShard.first()).setShardName(collShard.second());
|
||||
if (splitFuzz != null) {
|
||||
req.setSplitFuzz(splitFuzz);
|
||||
}
|
||||
String splitMethod = (String)params.get(CommonAdminParams.SPLIT_METHOD);
|
||||
if (splitMethod != null) {
|
||||
req.setSplitMethod(splitMethod);
|
||||
|
|
|
@ -1154,6 +1154,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
protected String shard;
|
||||
protected String splitMethod;
|
||||
protected Integer numSubShards;
|
||||
protected Float splitFuzz;
|
||||
|
||||
private Properties properties;
|
||||
|
||||
|
@ -1183,6 +1184,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
return splitMethod;
|
||||
}
|
||||
|
||||
public SplitShard setSplitFuzz(float splitFuzz) {
|
||||
this.splitFuzz = splitFuzz;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Float getSplitFuzz() {
|
||||
return splitFuzz;
|
||||
}
|
||||
|
||||
public SplitShard setSplitKey(String splitKey) {
|
||||
this.splitKey = splitKey;
|
||||
return this;
|
||||
|
@ -1220,8 +1230,12 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
params.set("split.key", this.splitKey);
|
||||
params.set(CoreAdminParams.RANGES, ranges);
|
||||
params.set(CommonAdminParams.SPLIT_METHOD, splitMethod);
|
||||
if(numSubShards != null)
|
||||
if(numSubShards != null) {
|
||||
params.set("numSubShards", numSubShards);
|
||||
}
|
||||
if (splitFuzz != null) {
|
||||
params.set(CommonAdminParams.SPLIT_FUZZ, String.valueOf(splitFuzz));
|
||||
}
|
||||
|
||||
if(properties != null) {
|
||||
addProperties(params, properties);
|
||||
|
|
|
@ -153,24 +153,53 @@ public abstract class DocRouter {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the range for each partition
|
||||
* Split the range into partitions.
|
||||
* @param partitions number of partitions
|
||||
* @param range range to split
|
||||
*/
|
||||
public List<Range> partitionRange(int partitions, Range range) {
|
||||
return partitionRange(partitions, range, 0.0f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Split the range into partitions with inexact sizes.
|
||||
* @param partitions number of partitions
|
||||
* @param range range to split
|
||||
* @param fuzz value between 0 (inclusive) and 0.5 (exclusive) indicating inexact split, i.e. percentage
|
||||
* of variation in resulting ranges - odd ranges will be larger and even ranges will be smaller
|
||||
* by up to that percentage.
|
||||
*/
|
||||
public List<Range> partitionRange(int partitions, Range range, float fuzz) {
|
||||
int min = range.min;
|
||||
int max = range.max;
|
||||
|
||||
assert max >= min;
|
||||
if (fuzz > 0.5f) {
|
||||
throw new IllegalArgumentException("'fuzz' parameter must be <= 0.5f but was " + fuzz);
|
||||
} else if (fuzz < 0.0f) {
|
||||
fuzz = 0.0f;
|
||||
}
|
||||
if (partitions == 0) return Collections.EMPTY_LIST;
|
||||
long rangeSize = (long)max - (long)min;
|
||||
long rangeStep = Math.max(1, rangeSize / partitions);
|
||||
long fuzzStep = Math.round(rangeStep * (double)fuzz / 2.0);
|
||||
|
||||
List<Range> ranges = new ArrayList<>(partitions);
|
||||
|
||||
long start = min;
|
||||
long end = start;
|
||||
boolean odd = true;
|
||||
|
||||
while (end < max) {
|
||||
end = start + rangeStep;
|
||||
if (fuzzStep > 0) {
|
||||
if (odd) {
|
||||
end = end + fuzzStep;
|
||||
} else {
|
||||
end = end - fuzzStep;
|
||||
}
|
||||
odd = !odd;
|
||||
}
|
||||
// make last range always end exactly on MAX_VALUE
|
||||
if (ranges.size() == partitions - 1) {
|
||||
end = max;
|
||||
|
|
|
@ -27,8 +27,10 @@ public interface CommonAdminParams
|
|||
String IN_PLACE_MOVE = "inPlaceMove";
|
||||
/** Method to use for shard splitting. */
|
||||
String SPLIT_METHOD = "splitMethod";
|
||||
/** **/
|
||||
/** Number of sub-shards to create. **/
|
||||
String NUM_SUB_SHARDS = "numSubShards";
|
||||
/** Timeout for replicas to become active. */
|
||||
String TIMEOUT = "timeout";
|
||||
/** Inexact shard splitting factor. */
|
||||
String SPLIT_FUZZ = "splitFuzz";
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue