SOLR-13763: Improve the tracking of "freedisk" in autoscaling simulations.

This commit is contained in:
Andrzej Bialecki 2019-09-18 19:18:47 +02:00
parent 369df12c2c
commit 6a8cfddf30
7 changed files with 253 additions and 57 deletions

View File

@ -161,6 +161,8 @@ Improvements
* SOLR-9658: Max idle time support for SolrCache implementations. (hoss, ab)
* SOLR-13763: Improve the tracking of "freedisk" in autoscaling simulations. (ab)
Bug Fixes
----------------------

View File

@ -155,7 +155,7 @@ public class SimCloudManager implements SolrCloudManager {
private boolean useSystemCollection = true;
private static int nodeIdPort = 10000;
public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
public static int DEFAULT_FREE_DISK = 10240; // 10 TiB
public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
@ -382,6 +382,10 @@ public class SimCloudManager implements SolrCloudManager {
return values;
}
public void disableMetricsHistory() {
metricsHistoryHandler.close();
}
public String dumpClusterState(boolean withCollections) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("#######################################\n");

View File

@ -128,7 +128,7 @@ import static org.apache.solr.common.params.CommonParams.NAME;
public class SimClusterStateProvider implements ClusterStateProvider {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final long DEFAULT_DOC_SIZE_BYTES = 500;
public static final long DEFAULT_DOC_SIZE_BYTES = 2048;
private static final String BUFFERED_UPDATES = "__buffered_updates__";
@ -1527,17 +1527,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
ensureSystemCollection(collection);
DocCollection coll = getClusterState().getCollection(collection);
DocRouter router = coll.getRouter();
List<String> deletes = req.getDeleteById();
Map<String, AtomicLong> freediskDeltaPerNode = new HashMap<>();
if (deletes != null && !deletes.isEmpty()) {
Map<String, AtomicLong> deletesPerShard = new HashMap<>();
Map<String, Number> indexSizePerShard = new HashMap<>();
for (String id : deletes) {
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
throw new IOException("-- no leader in " + s);
}
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
ReplicaInfo ri = getReplicaInfo(leader);
@ -1546,6 +1547,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
continue;
}
// this is somewhat wrong - we should wait until buffered updates are applied
// but this way the freedisk changes are much easier to track
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
if (bufferedUpdates.get() > 0) {
@ -1555,19 +1563,33 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
continue;
}
deletesPerShard.computeIfAbsent(s.getName(), slice -> new AtomicLong(0)).incrementAndGet();
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (indexSize != null) {
indexSizePerShard.put(s.getName(), indexSize);
}
}
if (!deletesPerShard.isEmpty()) {
lock.lockInterruptibly();
try {
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
new AtomicLong(indexSize.longValue()), false, false);
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
} else {
throw new Exception("unexpected indexSize ri=" + ri);
for (Map.Entry<String, AtomicLong> entry : deletesPerShard.entrySet()) {
String shard = entry.getKey();
simSetShardValue(collection, shard, "SEARCHER.searcher.deletedDocs", entry.getValue().get(), true, false);
simSetShardValue(collection, shard, "SEARCHER.searcher.numDocs", -entry.getValue().get(), true, false);
Number indexSize = indexSizePerShard.get(shard);
long delSize = DEFAULT_DOC_SIZE_BYTES * entry.getValue().get();
if (indexSize != null) {
indexSize = indexSize.longValue() - delSize;
if (indexSize.longValue() < SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
indexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES;
}
simSetShardValue(collection, shard, Type.CORE_IDX.metricsAttribute,
new AtomicLong(indexSize.longValue()), false, false);
simSetShardValue(collection, shard, Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
} else {
throw new Exception("unexpected indexSize for collection=" + collection + ", shard=" + shard + ": " + indexSize);
}
}
} catch (Exception e) {
throw new IOException(e);
@ -1582,11 +1604,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (!"*:*".equals(q)) {
throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
}
//log.debug("-- req delByQ " + collection);
for (Slice s : coll.getSlices()) {
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
throw new IOException("-- no leader in " + s);
}
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
@ -1597,6 +1619,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
lock.lockInterruptibly();
try {
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (indexSize != null) {
long delta = indexSize.longValue() < SimCloudManager.DEFAULT_IDX_SIZE_BYTES ? 0 :
indexSize.longValue() - SimCloudManager.DEFAULT_IDX_SIZE_BYTES;
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(delta));
} else {
throw new RuntimeException("Missing index size in " + ri);
}
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
@ -1626,6 +1658,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
if (docCount > 0) {
//log.debug("-- req update " + collection + " / " + docCount);
// this approach to updating counters and metrics drastically increases performance
// of bulk updates, because simSetShardValue is relatively costly
@ -1672,13 +1705,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Slice s = slices[i];
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
throw new IOException("-- no leader in " + s);
}
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
.addAndGet(perSlice[i]);
modified = true;
long perSliceCount = perSlice[i];
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(-perSliceCount * DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
bufferedUpdates.addAndGet(perSlice[i]);
@ -1697,13 +1733,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
throw new IOException("-- no leader in " + s);
}
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
.incrementAndGet();
modified = true;
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong())
.addAndGet(-DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
bufferedUpdates.incrementAndGet();
@ -1741,6 +1779,32 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.unlock();
}
}
if (!freediskDeltaPerNode.isEmpty()) {
SimNodeStateProvider nodeStateProvider = cloudManager.getSimNodeStateProvider();
freediskDeltaPerNode.forEach((node, delta) -> {
if (delta.get() == 0) {
return;
}
try {
// this method does its own locking to prevent races
nodeStateProvider.simUpdateNodeValue(node, Type.FREEDISK.tagName, val -> {
if (val == null) {
throw new RuntimeException("no freedisk for node " + node);
}
double freedisk = ((Number) val).doubleValue();
double deltaGB = (Double) Type.FREEDISK.convertVal(delta.get());
freedisk += deltaGB;
if (freedisk < 0) {
log.warn("-- freedisk=" + freedisk + " - ran out of disk space on node " + node);
freedisk = 0;
}
return freedisk;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
SolrParams params = req.getParams();
if (params != null && (params.getBool(UpdateParams.OPTIMIZE, false) || params.getBool(UpdateParams.EXPUNGE_DELETES, false))) {
lock.lockInterruptibly();

View File

@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -80,6 +81,23 @@ public class SimNodeStateProvider implements NodeStateProvider {
return values.get(key);
}
/**
* Atomically update a node value.
* @param node node id
* @param key property name
* @param updater updater function
* @return previous property value or null if property or node didn't exist.
*/
public Object simUpdateNodeValue(String node, String key, Function<Object, Object> updater) throws InterruptedException {
lock.lockInterruptibly();
try {
Map<String, Object> values = nodeValues.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
return values.put(key, updater.apply(values.get(key)));
} finally {
lock.unlock();
}
}
/**
* Set node values.
* NOTE: if values contain 'nodeRole' key then /roles.json is updated.

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.util.Iterator;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
/**
* Lightweight generator of fake documents
* NOTE: this iterator only ever returns the same document N times, which works ok
* for our "bulk index update" simulation. Obviously don't use this for real indexing.
*/
public class FakeDocIterator implements Iterator<SolrInputDocument> {
final SolrInputDocument doc = new SolrInputDocument();
final SolrInputField idField = new SolrInputField("id");
final long start, count;
long current, max;
FakeDocIterator(long start, long count) {
this.start = start;
this.count = count;
current = start;
max = start + count;
doc.put("id", idField);
idField.setValue("foo");
}
@Override
public boolean hasNext() {
return current < max;
}
@Override
public SolrInputDocument next() {
current++;
return doc;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
@ -30,8 +29,6 @@ import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
@ -144,36 +141,4 @@ public class TestSimExtremeIndexing extends SimSolrCloudTestCase {
solrClient.request(ureq);
}
// lightweight generator of fake documents
// NOTE: this iterator only ever returns the same document, which works ok
// for our "index update" simulation. Obviously don't use this for real indexing.
private static class FakeDocIterator implements Iterator<SolrInputDocument> {
final SolrInputDocument doc = new SolrInputDocument();
final SolrInputField idField = new SolrInputField("id");
final long start, count;
long current, max;
FakeDocIterator(long start, long count) {
this.start = start;
this.count = count;
current = start;
max = start + count;
doc.put("id", idField);
idField.setValue("foo");
}
@Override
public boolean hasNext() {
return current < max;
}
@Override
public SolrInputDocument next() {
current++;
return doc;
}
}
}

View File

@ -19,7 +19,10 @@ package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@ -29,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.lucene.util.TestUtil;
@ -36,7 +40,9 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.autoscaling.ActionContext;
@ -48,10 +54,13 @@ import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After;
@ -88,6 +97,9 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase {
public void setupTest() throws Exception {
configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
// disable metrics history collection
cluster.disableMetricsHistory();
// disable .scheduled_maintenance (once it exists)
CloudTestUtils.waitForTriggerToBeScheduled(cluster, ".scheduled_maintenance");
CloudTestUtils.suspendTrigger(cluster, ".scheduled_maintenance");
@ -752,4 +764,79 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase {
assertEquals("shard1", hint.second());
});
}
@Test
public void testFreediskTracking() throws Exception {
int NUM_DOCS = 100000;
String collectionName = "testFreeDisk";
SolrClient solrClient = cluster.simGetSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf",2, 2);
create.process(solrClient);
CloudUtil.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
collectionName, CloudUtil.clusterShape(2, 2, false, true));
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
Set<String> nodes = coll.getReplicas().stream()
.map(r -> r.getNodeName())
.collect(Collectors.toSet());
Map<String, Number> initialFreedisk = getFreeDiskPerNode(nodes);
// test small updates
for (int i = 0; i < NUM_DOCS; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
solrClient.add(collectionName, doc);
}
Map<String, Number> updatedFreedisk = getFreeDiskPerNode(nodes);
double delta = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk);
// 2 replicas - twice as much delta
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2, delta, delta * 0.1);
// test small deletes - delete half of docs
for (int i = 0; i < NUM_DOCS / 2; i++) {
solrClient.deleteById(collectionName, "id-" + i);
}
Map<String, Number> updatedFreedisk1 = getFreeDiskPerNode(nodes);
double delta1 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk1);
// 2 replicas but half the docs
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2 / 2, delta1, delta1 * 0.1);
// test bulk delete
solrClient.deleteByQuery(collectionName, "*:*");
Map<String, Number> updatedFreedisk2 = getFreeDiskPerNode(nodes);
double delta2 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk2);
// 0 docs - initial freedisk
log.info(cluster.dumpClusterState(true));
assertEquals(0.0, delta2, delta2 * 0.1);
// test bulk update
UpdateRequest ureq = new UpdateRequest();
ureq.setDocIterator(new FakeDocIterator(0, NUM_DOCS));
ureq.process(solrClient, collectionName);
Map<String, Number> updatedFreedisk3 = getFreeDiskPerNode(nodes);
double delta3 = getDeltaFreeDiskBytes(initialFreedisk, updatedFreedisk3);
assertEquals(SimClusterStateProvider.DEFAULT_DOC_SIZE_BYTES * NUM_DOCS * 2, delta3, delta3 * 0.1);
}
private double getDeltaFreeDiskBytes(Map<String, Number> initial, Map<String, Number> updated) {
double deltaGB = 0;
for (String node : initial.keySet()) {
double before = initial.get(node).doubleValue();
double after = updated.get(node).doubleValue();
assertTrue("freedisk after=" + after + " not smaller than before=" + before, after <= before);
deltaGB += before - after;
}
return deltaGB * 1024.0 * 1024.0 * 1024.0;
}
private Map<String, Number> getFreeDiskPerNode(Collection<String> nodes) throws Exception {
Map<String, Number> freediskPerNode = new HashMap<>();
for (String node : nodes) {
Map<String, Object> values = cluster.getNodeStateProvider().getNodeValues(node, Arrays.asList(Variable.Type.FREEDISK.tagName));
freediskPerNode.put(node, (Number) values.get(Variable.Type.FREEDISK.tagName));
}
log.info("- freeDiskPerNode: " + Utils.toJSONString(freediskPerNode));
return freediskPerNode;
}
}