mirror of https://github.com/apache/lucene.git
SOLR-12709: Add TestSimExtremeIndexing for testing simulated large indexing jobs.
Several important improvements to the simulator.
This commit is contained in:
parent
c587410f99
commit
2369c89634
|
@ -136,7 +136,7 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
log.info("Computed Plan: {}", operation.getParams());
|
||||
log.debug("Computed Plan: {}", operation.getParams());
|
||||
if (!collections.isEmpty()) {
|
||||
String coll = operation.getParams().get(CoreAdminParams.COLLECTION);
|
||||
if (coll != null && !collections.contains(coll)) {
|
||||
|
@ -175,7 +175,11 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
clusterState.forEachCollection(coll -> {
|
||||
Integer rf = coll.getReplicationFactor();
|
||||
if (rf == null) {
|
||||
rf = coll.getReplicas().size() / coll.getSlices().size();
|
||||
if (coll.getSlices().isEmpty()) {
|
||||
rf = 1; // ???
|
||||
} else {
|
||||
rf = coll.getReplicas().size() / coll.getSlices().size();
|
||||
}
|
||||
}
|
||||
totalRF.addAndGet(rf * coll.getSlices().size());
|
||||
});
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -431,8 +432,15 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
Map<String, List<ReplicaInfo>> belowSize) {
|
||||
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
|
||||
properties.put(TriggerEvent.REQUESTED_OPS, ops);
|
||||
properties.put(ABOVE_SIZE_PROP, aboveSize);
|
||||
properties.put(BELOW_SIZE_PROP, belowSize);
|
||||
// avoid passing very large amounts of data here - just use replica names
|
||||
TreeMap<String, String> above = new TreeMap<>();
|
||||
aboveSize.forEach((coll, replicas) ->
|
||||
replicas.forEach(r -> above.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_PROP) + ", bytes=" + r.getVariable(BYTES_SIZE_PROP))));
|
||||
properties.put(ABOVE_SIZE_PROP, above);
|
||||
TreeMap<String, String> below = new TreeMap<>();
|
||||
belowSize.forEach((coll, replicas) ->
|
||||
replicas.forEach(r -> below.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_PROP) + ", bytes=" + r.getVariable(BYTES_SIZE_PROP))));
|
||||
properties.put(BELOW_SIZE_PROP, below);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -791,7 +791,7 @@ public class SolrMetricManager {
|
|||
*/
|
||||
public static String getRegistryName(SolrInfoBean.Group group, String... names) {
|
||||
String fullName;
|
||||
String prefix = REGISTRY_NAME_PREFIX + group.toString() + ".";
|
||||
String prefix = new StringBuilder(REGISTRY_NAME_PREFIX).append(group.name()).append('.').toString();
|
||||
// check for existing prefix and group
|
||||
if (names != null && names.length > 0 && names[0] != null && names[0].startsWith(prefix)) {
|
||||
// assume the first segment already was expanded
|
||||
|
|
|
@ -109,7 +109,7 @@ public class CloudTestUtils {
|
|||
log.trace("-- still not matching predicate: {}", state);
|
||||
}
|
||||
}
|
||||
throw new TimeoutException("last state: " + coll);
|
||||
throw new TimeoutException("last ClusterState: " + state + ", last coll state: " + coll);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,13 +141,13 @@ public class CloudTestUtils {
|
|||
}
|
||||
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
|
||||
if (slices.size() != expectedShards) {
|
||||
log.trace("-- wrong number of active slices, expected={}, found={}", expectedShards, collectionState.getSlices().size());
|
||||
log.trace("-- wrong number of slices, expected={}, found={}: {}", expectedShards, collectionState.getSlices().size(), collectionState.getSlices());
|
||||
return false;
|
||||
}
|
||||
Set<String> leaderless = new HashSet<>();
|
||||
for (Slice slice : slices) {
|
||||
int activeReplicas = 0;
|
||||
if (requireLeaders && slice.getLeader() == null) {
|
||||
if (requireLeaders && slice.getState() != Slice.State.INACTIVE && slice.getLeader() == null) {
|
||||
leaderless.add(slice.getName());
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
if (random().nextBoolean() || true) {
|
||||
if (random().nextBoolean()) {
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
solrClient = cluster.getSolrClient();
|
||||
loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
|
@ -190,7 +190,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
assertNotNull("should have fired an event", ev);
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) ev.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("should contain requestedOps", ops);
|
||||
assertEquals("number of ops", 2, ops.size());
|
||||
assertEquals("number of ops: " + ops, 2, ops.size());
|
||||
boolean shard1 = false;
|
||||
boolean shard2 = false;
|
||||
for (TriggerEvent.Op op : ops) {
|
||||
|
@ -361,7 +361,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
|
||||
CloudTestUtils.clusterShape(2, 2, false, true));
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
|
||||
solrClient.add(collectionName, doc);
|
||||
}
|
||||
|
@ -412,7 +412,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// delete some docs to trigger a merge
|
||||
for (int i = 0; i < 5; i++) {
|
||||
for (int i = 0; i < 15; i++) {
|
||||
solrClient.deleteById(collectionName, "id-" + (i * 100));
|
||||
}
|
||||
solrClient.commit(collectionName);
|
||||
|
@ -425,7 +425,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
assertEquals("success", response.get("result").toString());
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
|
|
|
@ -31,12 +31,14 @@ import java.util.Random;
|
|||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
|
||||
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
|
||||
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
|
||||
|
@ -50,6 +52,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
|||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
|
@ -118,6 +121,7 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
private final String metricTag;
|
||||
|
||||
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
|
||||
private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
|
||||
private final MockSearchableSolrClient solrClient;
|
||||
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
|
||||
|
||||
|
@ -129,9 +133,11 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
private MetricsHandler metricsHandler;
|
||||
private MetricsHistoryHandler metricsHistoryHandler;
|
||||
private TimeSource timeSource;
|
||||
private boolean useSystemCollection = true;
|
||||
|
||||
private static int nodeIdPort = 10000;
|
||||
public static int DEFAULT_DISK = 1024; // 1000 GiB
|
||||
public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
|
||||
public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
|
||||
public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
|
||||
|
||||
/**
|
||||
|
@ -201,7 +207,14 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
request = new QueryRequest(params);
|
||||
} else {
|
||||
// search request
|
||||
return super.request(request, collection);
|
||||
if (collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
return super.request(request, collection);
|
||||
} else {
|
||||
// forward it
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(request.getParams());
|
||||
params.set("collection", collection);
|
||||
request = new QueryRequest(params);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection);
|
||||
|
@ -306,7 +319,8 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
values.put(ImplicitSnitch.PORT, port);
|
||||
values.put(ImplicitSnitch.NODE, nodeId);
|
||||
values.put(ImplicitSnitch.CORES, 0);
|
||||
values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
|
||||
values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK);
|
||||
values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK);
|
||||
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
|
||||
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
|
||||
values.put("sysprop.java.version", System.getProperty("java.version"));
|
||||
|
@ -353,7 +367,13 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
Set<String> deadNodes = getSimNodeStateProvider().simGetDeadNodes();
|
||||
sb.append("## Dead nodes:\t\t" + deadNodes.size() + "\n");
|
||||
deadNodes.forEach(n -> sb.append("##\t\t" + n + "\n"));
|
||||
sb.append("## Collections:\t" + getSimClusterStateProvider().simListCollections() + "\n");
|
||||
sb.append("## Collections:\n");
|
||||
clusterStateProvider.simGetCollectionStats().forEach((coll, stats) -> {
|
||||
sb.append("## * ").append(coll).append('\n');
|
||||
stats.forEach((k, v) -> {
|
||||
sb.append("## " + k + "\t" + v + "\n");
|
||||
});
|
||||
});
|
||||
if (withCollections) {
|
||||
ClusterState state = clusterStateProvider.getClusterState();
|
||||
state.forEachCollection(coll -> sb.append(coll.toString() + "\n"));
|
||||
|
@ -385,6 +405,13 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
return loader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the source of randomness (usually initialized by the test suite).
|
||||
*/
|
||||
public Random getRandom() {
|
||||
return RandomizedContext.current().getRandom();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new node and initialize its node values (metrics). The
|
||||
* /live_nodes list is updated with the new node id.
|
||||
|
@ -448,6 +475,10 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void simSetUseSystemCollection(boolean useSystemCollection) {
|
||||
this.useSystemCollection = useSystemCollection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the (simulated) .system collection.
|
||||
*/
|
||||
|
@ -464,17 +495,7 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
}
|
||||
|
||||
public Map<String, Map<String, AtomicInteger>> simGetEventCounts() {
|
||||
TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
|
||||
synchronized (systemColl) {
|
||||
for (SolrInputDocument d : systemColl) {
|
||||
if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
|
||||
continue;
|
||||
}
|
||||
counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
|
||||
.computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
|
||||
.incrementAndGet();
|
||||
}
|
||||
}
|
||||
TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts);
|
||||
return counts;
|
||||
}
|
||||
|
||||
|
@ -705,6 +726,9 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
rsp.setResponse(queryResponse.getValues());
|
||||
log.trace("-- response: {}", rsp);
|
||||
return rsp;
|
||||
} else if (req instanceof QueryRequest) {
|
||||
incrementCount("query");
|
||||
return clusterStateProvider.simQuery((QueryRequest)req);
|
||||
}
|
||||
}
|
||||
if (req instanceof UpdateRequest) {
|
||||
|
@ -715,7 +739,17 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
List<SolrInputDocument> docs = ureq.getDocuments();
|
||||
if (docs != null) {
|
||||
systemColl.addAll(docs);
|
||||
if (useSystemCollection) {
|
||||
systemColl.addAll(docs);
|
||||
}
|
||||
for (SolrInputDocument d : docs) {
|
||||
if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
|
||||
continue;
|
||||
}
|
||||
eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
|
||||
.computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
|
||||
.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return new UpdateResponse();
|
||||
} else {
|
||||
|
|
|
@ -27,20 +27,25 @@ import java.util.EnumMap;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
|
@ -51,7 +56,9 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.ActionThrottle;
|
||||
import org.apache.solr.cloud.CloudTestUtils;
|
||||
|
@ -65,6 +72,7 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
|
|||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||
import org.apache.solr.cloud.overseer.ZkWriteCommand;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -79,6 +87,7 @@ import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
|||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CommonAdminParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
@ -118,11 +127,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
public static final long DEFAULT_DOC_SIZE_BYTES = 500;
|
||||
|
||||
private static final String BUFFERED_UPDATES = "__buffered_updates__";
|
||||
|
||||
private final LiveNodesSet liveNodes;
|
||||
private final SimDistribStateManager stateManager;
|
||||
private final SimCloudManager cloudManager;
|
||||
|
||||
private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
|
||||
private final Map<String, Map<String, List<ReplicaInfo>>> colShardReplicaMap = new ConcurrentHashMap<>();
|
||||
private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
|
||||
private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
|
||||
private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
|
||||
|
@ -145,6 +157,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
|
||||
private AtomicBoolean saveClusterState = new AtomicBoolean();
|
||||
|
||||
private Random bulkUpdateRandom = new Random(0);
|
||||
|
||||
private transient boolean closed;
|
||||
|
||||
/**
|
||||
|
@ -226,6 +240,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
.computeIfAbsent(shard, s -> new ActionThrottle("leader", 5000, cloudManager.getTimeSource()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get random node id.
|
||||
* @return one of the live nodes
|
||||
*/
|
||||
public String simGetRandomNode() {
|
||||
return simGetRandomNode(cloudManager.getRandom());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get random node id.
|
||||
* @param random instance of random.
|
||||
|
@ -506,12 +528,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
|
||||
// add a property expected in Policy calculations, if missing
|
||||
if (replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute) == null) {
|
||||
replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, SimCloudManager.DEFAULT_IDX_SIZE_BYTES);
|
||||
replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
|
||||
replicaInfo.getVariables().put(Variable.coreidxsize,
|
||||
Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
|
||||
new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)));
|
||||
}
|
||||
|
||||
replicas.add(replicaInfo);
|
||||
colShardReplicaMap.computeIfAbsent(replicaInfo.getCollection(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(replicaInfo.getShard(), s -> new ArrayList<>())
|
||||
.add(replicaInfo);
|
||||
|
||||
Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
|
||||
.computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
|
||||
|
@ -523,7 +548,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
|
||||
Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
|
||||
if (disk == null) {
|
||||
disk = SimCloudManager.DEFAULT_DISK;
|
||||
disk = SimCloudManager.DEFAULT_FREE_DISK;
|
||||
}
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
|
||||
// fake metrics
|
||||
|
@ -533,7 +558,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests");
|
||||
cloudManager.getMetricManager().registry(registry).counter("QUERY./select.requests");
|
||||
cloudManager.getMetricManager().registerGauge(null, registry,
|
||||
() -> replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute),
|
||||
() -> ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue(),
|
||||
"", true, "INDEX.sizeInBytes");
|
||||
// at this point nuke our cached DocCollection state
|
||||
collectionsStatesRef.set(null);
|
||||
|
@ -559,6 +584,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
for (int i = 0; i < replicas.size(); i++) {
|
||||
if (coreNodeName.equals(replicas.get(i).getName())) {
|
||||
ReplicaInfo ri = replicas.remove(i);
|
||||
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
|
||||
.remove(ri);
|
||||
collectionsStatesRef.set(null);
|
||||
|
||||
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
|
||||
|
@ -598,6 +626,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
int version = oldData != null ? oldData.getVersion() : -1;
|
||||
Assert.assertEquals(clusterStateVersion, version + 1);
|
||||
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
|
||||
log.debug("** saved cluster state version " + version);
|
||||
clusterStateVersion++;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
|
@ -635,15 +664,22 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
return;
|
||||
}
|
||||
dc.getSlices().forEach(s -> {
|
||||
if (s.getState() == Slice.State.INACTIVE) {
|
||||
log.trace("-- slice state is {}, skip leader election {} / {}", s.getState(), dc.getName(), s.getName());
|
||||
return;
|
||||
}
|
||||
if (s.getState() != Slice.State.ACTIVE) {
|
||||
log.trace("-- slice state is {}, but I will run leader election {} / {}", s.getState(), dc.getName(), s.getName());
|
||||
}
|
||||
if (s.getLeader() != null) {
|
||||
log.debug("-- already has leader {} / {}", dc.getName(), s.getName());
|
||||
log.trace("-- already has leader {} / {}", dc.getName(), s.getName());
|
||||
return;
|
||||
}
|
||||
if (s.getReplicas().isEmpty()) {
|
||||
log.debug("-- no replicas in {} / {}", dc.getName(), s.getName());
|
||||
log.trace("-- no replicas in {} / {}", dc.getName(), s.getName());
|
||||
return;
|
||||
}
|
||||
log.debug("-- submit leader election for {} / {}", dc.getName(), s.getName());
|
||||
log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
|
||||
cloudManager.submit(() -> {
|
||||
simRunLeaderElection(dc.getName(), s, saveClusterState);
|
||||
return true;
|
||||
|
@ -656,9 +692,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
|
||||
log.debug("Running leader election for {} / {}", collection, s.getName());
|
||||
log.trace("Running leader election for {} / {}", collection, s.getName());
|
||||
if (s.getReplicas().isEmpty()) { // no replicas - punt
|
||||
log.debug("-- no replicas in {} / {}", collection, s.getName());
|
||||
log.trace("-- no replicas in {} / {}", collection, s.getName());
|
||||
return;
|
||||
}
|
||||
ActionThrottle lt = getThrottle(collection, s.getName());
|
||||
|
@ -692,7 +728,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
});
|
||||
if (alreadyHasLeader.get()) {
|
||||
log.debug("-- already has leader {} / {}: {}", collection, s.getName(), s);
|
||||
log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
|
||||
return;
|
||||
}
|
||||
if (active.isEmpty()) {
|
||||
|
@ -718,11 +754,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
synchronized (ri) {
|
||||
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
|
||||
}
|
||||
log.debug("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
|
||||
stateChanged.set(true);
|
||||
log.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri.getName());
|
||||
}
|
||||
} else {
|
||||
log.debug("-- already has leader for {} / {}", collection, s.getName());
|
||||
log.trace("-- already has leader for {} / {}", collection, s.getName());
|
||||
}
|
||||
if (stateChanged.get() || saveState) {
|
||||
collectionsStatesRef.set(null);
|
||||
|
@ -810,9 +846,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
collection.getReplicas().size() + 1);
|
||||
try {
|
||||
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", 0);
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", 0);
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
|
||||
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
|
||||
coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
|
||||
cloudManager.submit(() -> {
|
||||
|
@ -833,9 +869,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
replicaNum.getAndIncrement());
|
||||
try {
|
||||
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", 0);
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", 0);
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
|
||||
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
|
||||
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
|
||||
cloudManager.submit(() -> {
|
||||
|
@ -900,6 +936,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
collProperties.remove(collection);
|
||||
sliceProperties.remove(collection);
|
||||
leaderThrottles.remove(collection);
|
||||
colShardReplicaMap.remove(collection);
|
||||
|
||||
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
|
||||
|
||||
|
@ -942,6 +979,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
lock.lockInterruptibly();
|
||||
try {
|
||||
nodeReplicaMap.clear();
|
||||
colShardReplicaMap.clear();
|
||||
collProperties.clear();
|
||||
sliceProperties.clear();
|
||||
leaderThrottles.clear();
|
||||
|
@ -1086,12 +1124,24 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
sliceName.set(message.getStr(SHARD_ID_PROP));
|
||||
String splitKey = message.getStr("split.key");
|
||||
|
||||
// always invalidate cached collection states to get up-to-date metrics
|
||||
collectionsStatesRef.set(null);
|
||||
|
||||
ClusterState clusterState = getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
|
||||
Replica leader = parentSlice.getLeader();
|
||||
// XXX leader election may not have happened yet - should we require it?
|
||||
if (leader == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
|
||||
" / " + sliceName.get() + " has no leader and can't be split");
|
||||
}
|
||||
// start counting buffered updates
|
||||
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
|
||||
if (props.containsKey(BUFFERED_UPDATES)) {
|
||||
log.debug("--- SOLR-12729: Overlapping splitShard commands for {} / {}", collectionName, sliceName.get());
|
||||
return;
|
||||
}
|
||||
props.put(BUFFERED_UPDATES, new AtomicLong());
|
||||
|
||||
List<DocRouter.Range> subRanges = new ArrayList<>();
|
||||
List<String> subSlices = new ArrayList<>();
|
||||
List<String> subShardNames = new ArrayList<>();
|
||||
|
@ -1117,12 +1167,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (sessionWrapper != null) sessionWrapper.release();
|
||||
|
||||
// adjust numDocs / deletedDocs / maxDoc
|
||||
Replica leader = parentSlice.getLeader();
|
||||
// XXX leader election may not have happened yet - should we require it?
|
||||
if (leader == null) {
|
||||
leader = parentSlice.getReplicas().iterator().next();
|
||||
}
|
||||
String numDocsStr = leader.getStr("SEARCHER.searcher.numDocs", "0");
|
||||
String numDocsStr = String.valueOf(getReplicaInfo(leader).getVariable("SEARCHER.searcher.numDocs", "0"));
|
||||
long numDocs = Long.parseLong(numDocsStr);
|
||||
long newNumDocs = numDocs / subSlices.size();
|
||||
long remainderDocs = numDocs % subSlices.size();
|
||||
|
@ -1130,10 +1175,23 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
long remainderIndexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES + remainderDocs * DEFAULT_DOC_SIZE_BYTES;
|
||||
String remainderSlice = null;
|
||||
|
||||
// add slice props
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = subSlices.get(i);
|
||||
DocRouter.Range range = subRanges.get(i);
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
|
||||
sliceProps.put(Slice.RANGE, range);
|
||||
sliceProps.put(Slice.PARENT, sliceName.get());
|
||||
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
|
||||
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
}
|
||||
// add replicas
|
||||
for (ReplicaPosition replicaPosition : replicaPositions) {
|
||||
String subSliceName = replicaPosition.shard;
|
||||
String subShardNodeName = replicaPosition.node;
|
||||
String solrCoreName = collectionName + "_" + subSliceName + "_replica" + (replicaPosition.index);
|
||||
// String solrCoreName = collectionName + "_" + subSliceName + "_replica_n" + (replicaPosition.index);
|
||||
String solrCoreName = Assign.buildSolrCoreName(collectionName, subSliceName, replicaPosition.type, Assign.incAndGetId(stateManager, collectionName, 0));
|
||||
Map<String, Object> replicaProps = new HashMap<>();
|
||||
replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
|
||||
replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
|
||||
|
@ -1149,43 +1207,75 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
replicasNumDocs += remainderDocs;
|
||||
replicasIndexSize += remainderIndexSize;
|
||||
}
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", replicasNumDocs);
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", replicasNumDocs);
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
|
||||
replicaProps.put(Type.CORE_IDX.metricsAttribute, replicasIndexSize);
|
||||
replicaProps.put(Variable.coreidxsize, Type.CORE_IDX.convertVal(replicasIndexSize));
|
||||
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(replicasNumDocs));
|
||||
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(replicasNumDocs));
|
||||
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
|
||||
replicaProps.put(Type.CORE_IDX.metricsAttribute, new AtomicLong(replicasIndexSize));
|
||||
replicaProps.put(Variable.coreidxsize, new AtomicDouble((Double)Type.CORE_IDX.convertVal(replicasIndexSize)));
|
||||
|
||||
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
|
||||
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
|
||||
simAddReplica(replicaPosition.node, ri, false);
|
||||
}
|
||||
// mark the old slice as inactive
|
||||
simRunLeaderElection(Collections.singleton(collectionName), true);
|
||||
|
||||
// delay it once again to better simulate replica recoveries
|
||||
//opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
|
||||
|
||||
CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
|
||||
for (String subSlice : subSlices) {
|
||||
Slice s = state.getSlice(subSlice);
|
||||
if (s.getLeader() == null) {
|
||||
log.debug("** no leader in {} / {}", collectionName, s);
|
||||
return false;
|
||||
}
|
||||
if (s.getReplicas().size() < repFactor) {
|
||||
log.debug("** expected {} repFactor but there are {} replicas", repFactor, s.getReplicas().size());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
// mark the new slices as active and the old slice as inactive
|
||||
log.trace("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
|
||||
sliceName.get(), subSlices);
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
|
||||
props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
|
||||
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
sProps.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
|
||||
sProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
AtomicLong bufferedUpdates = (AtomicLong)sProps.remove(BUFFERED_UPDATES);
|
||||
if (bufferedUpdates.get() > 0) {
|
||||
// apply buffered updates
|
||||
long perShard = bufferedUpdates.get() / subSlices.size();
|
||||
long remainder = bufferedUpdates.get() % subSlices.size();
|
||||
log.debug("-- applying {} buffered docs from {} / {}, perShard={}, remainder={}", bufferedUpdates.get(),
|
||||
collectionName, parentSlice.getName(), perShard, remainder);
|
||||
for (int i = 0; i < subSlices.size(); i++) {
|
||||
String sub = subSlices.get(i);
|
||||
long numUpdates = perShard;
|
||||
if (i == 0) {
|
||||
numUpdates += remainder;
|
||||
}
|
||||
simSetShardValue(collectionName, sub, "SEARCHER.searcher.numDocs", numUpdates, true, false);
|
||||
simSetShardValue(collectionName, sub, "SEARCHER.searcher.maxDoc", numUpdates, true, false);
|
||||
}
|
||||
}
|
||||
// XXX also mark replicas as down? currently SplitShardCmd doesn't do this
|
||||
|
||||
for (String s : subSlices) {
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(s, ss -> new ConcurrentHashMap<>());
|
||||
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
|
||||
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
}
|
||||
|
||||
// invalidate cached state
|
||||
collectionsStatesRef.set(null);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
// add slice props
|
||||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = subSlices.get(i);
|
||||
DocRouter.Range range = subRanges.get(i);
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
|
||||
sliceProps.put(Slice.RANGE, range);
|
||||
sliceProps.put(Slice.PARENT, sliceName.get());
|
||||
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
|
||||
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
|
||||
}
|
||||
collectionsStatesRef.set(null);
|
||||
simRunLeaderElection(Collections.singleton(collectionName), true);
|
||||
results.add("success", "");
|
||||
|
||||
}
|
||||
|
@ -1216,7 +1306,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
|
||||
sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
|
||||
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
Iterator<ReplicaInfo> it = replicas.iterator();
|
||||
while (it.hasNext()) {
|
||||
|
@ -1237,7 +1328,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
public void createSystemCollection() throws IOException {
|
||||
try {
|
||||
if (simListCollections().contains(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
return;
|
||||
}
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
|
@ -1278,7 +1369,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (collection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
|
||||
}
|
||||
if (!simListCollections().contains(collection)) {
|
||||
if (!colShardReplicaMap.containsKey(collection)) {
|
||||
if (CollectionAdminParams.SYSTEM_COLL.equals(collection)) {
|
||||
// auto-create
|
||||
createSystemCollection();
|
||||
|
@ -1286,126 +1377,257 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
|
||||
}
|
||||
}
|
||||
// always reset first to get the current metrics - it's easier than to keep matching
|
||||
// Replica with ReplicaInfo where the current real counts are stored
|
||||
collectionsStatesRef.set(null);
|
||||
|
||||
DocCollection coll = getClusterState().getCollection(collection);
|
||||
DocRouter router = coll.getRouter();
|
||||
|
||||
boolean modified = false;
|
||||
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
List<String> deletes = req.getDeleteById();
|
||||
if (deletes != null && !deletes.isEmpty()) {
|
||||
for (String id : deletes) {
|
||||
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
|
||||
// NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
|
||||
List<String> deletes = req.getDeleteById();
|
||||
if (deletes != null && !deletes.isEmpty()) {
|
||||
for (String id : deletes) {
|
||||
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
log.debug("-- no leader in " + s);
|
||||
continue;
|
||||
}
|
||||
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
|
||||
ReplicaInfo ri = getReplicaInfo(leader);
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
|
||||
if (numDocs == null || numDocs.intValue() <= 0) {
|
||||
log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
|
||||
continue;
|
||||
}
|
||||
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
|
||||
if (bufferedUpdates != null) {
|
||||
if (bufferedUpdates.get() > 0) {
|
||||
bufferedUpdates.decrementAndGet();
|
||||
} else {
|
||||
log.debug("-- attempting to delete nonexistent buffered doc " + id + " from " + s.getLeader());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
|
||||
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
|
||||
if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
|
||||
indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
|
||||
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
|
||||
new AtomicLong(indexSize.longValue()), false, false);
|
||||
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
|
||||
new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
|
||||
} else {
|
||||
throw new Exception("unexpected indexSize ri=" + ri);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
deletes = req.getDeleteQuery();
|
||||
if (deletes != null && !deletes.isEmpty()) {
|
||||
for (String q : deletes) {
|
||||
if (!"*:*".equals(q)) {
|
||||
throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
|
||||
}
|
||||
for (Slice s : coll.getSlices()) {
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
log.debug("-- no leader in " + s);
|
||||
continue;
|
||||
}
|
||||
|
||||
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
|
||||
ReplicaInfo ri = getReplicaInfo(leader);
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
|
||||
if (numDocs == null || numDocs.intValue() <= 0) {
|
||||
log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
|
||||
if (numDocs == null || numDocs.intValue() == 0) {
|
||||
continue;
|
||||
}
|
||||
modified = true;
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
|
||||
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
|
||||
if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
|
||||
indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
|
||||
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
|
||||
indexSize.intValue(), false, false);
|
||||
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
|
||||
Type.CORE_IDX.convertVal(indexSize), false, false);
|
||||
} else {
|
||||
throw new Exception("unexpected indexSize ri=" + ri);
|
||||
}
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
|
||||
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
|
||||
new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
|
||||
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
|
||||
new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
deletes = req.getDeleteQuery();
|
||||
if (deletes != null && !deletes.isEmpty()) {
|
||||
for (String q : deletes) {
|
||||
if (!"*:*".equals(q)) {
|
||||
throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
|
||||
}
|
||||
List<SolrInputDocument> docs = req.getDocuments();
|
||||
int docCount = 0;
|
||||
Iterator<SolrInputDocument> it = null;
|
||||
if (docs != null) {
|
||||
docCount = docs.size();
|
||||
} else {
|
||||
it = req.getDocIterator();
|
||||
if (it != null) {
|
||||
while (it.hasNext()) {
|
||||
it.next();
|
||||
docCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (docCount > 0) {
|
||||
// this approach to updating counters and metrics drastically increases performance
|
||||
// of bulk updates, because simSetShardValue is relatively costly
|
||||
|
||||
Map<String, AtomicLong> docUpdates = new HashMap<>();
|
||||
Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
|
||||
|
||||
// XXX don't add more than 2bln docs in one request
|
||||
boolean modified = false;
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
coll = getClusterState().getCollection(collection);
|
||||
Slice[] slices = coll.getActiveSlicesArr();
|
||||
if (slices.length == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
|
||||
}
|
||||
int[] perSlice = new int[slices.length];
|
||||
|
||||
if (it != null) {
|
||||
// BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
|
||||
// which adds significant overhead
|
||||
|
||||
int totalAdded = 0;
|
||||
for (int i = 0; i < slices.length; i++) {
|
||||
Slice s = slices[i];
|
||||
long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
|
||||
perSlice[i] = (int) count;
|
||||
totalAdded += perSlice[i];
|
||||
}
|
||||
for (Slice s : coll.getSlices()) {
|
||||
// loss of precision due to integer math
|
||||
int diff = docCount - totalAdded;
|
||||
if (diff > 0) {
|
||||
// spread the remainder more or less equally
|
||||
int perRemain = diff / slices.length;
|
||||
int remainder = diff % slices.length;
|
||||
int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
|
||||
for (int i = 0; i < slices.length; i++) {
|
||||
perSlice[i] += perRemain;
|
||||
if (i == remainderSlice) {
|
||||
perSlice[i] += remainder;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < slices.length; i++) {
|
||||
Slice s = slices[i];
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
log.debug("-- no leader in " + s);
|
||||
continue;
|
||||
}
|
||||
|
||||
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
|
||||
ReplicaInfo ri = getReplicaInfo(leader);
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
|
||||
if (numDocs == null || numDocs.intValue() == 0) {
|
||||
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
|
||||
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
|
||||
.addAndGet(perSlice[i]);
|
||||
modified = true;
|
||||
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
|
||||
if (bufferedUpdates != null) {
|
||||
bufferedUpdates.addAndGet(perSlice[i]);
|
||||
continue;
|
||||
}
|
||||
modified = true;
|
||||
try {
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", numDocs, false, false);
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 0, false, false);
|
||||
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
|
||||
SimCloudManager.DEFAULT_IDX_SIZE_BYTES, false, false);
|
||||
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
|
||||
Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
|
||||
.addAndGet(perSlice[i]);
|
||||
}
|
||||
} else {
|
||||
// SMALL UPDATE: use exact assignment via DocRouter
|
||||
for (SolrInputDocument doc : docs) {
|
||||
String id = (String) doc.getFieldValue("id");
|
||||
if (id == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
|
||||
}
|
||||
Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
log.debug("-- no leader in " + s);
|
||||
continue;
|
||||
}
|
||||
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
|
||||
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
|
||||
.incrementAndGet();
|
||||
modified = true;
|
||||
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
|
||||
if (bufferedUpdates != null) {
|
||||
bufferedUpdates.incrementAndGet();
|
||||
continue;
|
||||
}
|
||||
docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
|
||||
.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
List<SolrInputDocument> docs = req.getDocuments();
|
||||
if (docs != null && !docs.isEmpty()) {
|
||||
for (SolrInputDocument doc : docs) {
|
||||
String id = (String) doc.getFieldValue("id");
|
||||
if (id == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
|
||||
}
|
||||
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
log.debug("-- no leader in " + s);
|
||||
continue;
|
||||
}
|
||||
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
|
||||
modified = true;
|
||||
try {
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 1, true, false);
|
||||
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.maxDoc", 1, true, false);
|
||||
|
||||
ReplicaInfo ri = getReplicaInfo(leader);
|
||||
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
|
||||
// for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
|
||||
indexSize = indexSize.longValue() + DEFAULT_DOC_SIZE_BYTES;
|
||||
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
|
||||
indexSize.longValue(), false, false);
|
||||
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
|
||||
Type.CORE_IDX.convertVal(indexSize), false, false);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (modified) {
|
||||
docUpdates.forEach((sh, count) -> {
|
||||
try {
|
||||
simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
|
||||
simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
|
||||
// for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
|
||||
simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
|
||||
DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
|
||||
simSetShardValue(collection, sh, Variable.coreidxsize,
|
||||
Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
metricUpdates.forEach((sh, cores) -> {
|
||||
cores.forEach((core, count) -> {
|
||||
String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
|
||||
Utils.parseMetricsReplicaName(collection, core));
|
||||
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
|
||||
});
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
if (modified) {
|
||||
collectionsStatesRef.set(null);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return new UpdateResponse();
|
||||
}
|
||||
|
||||
public QueryResponse simQuery(QueryRequest req) throws SolrException, InterruptedException, IOException {
|
||||
ensureNotClosed();
|
||||
String collection = req.getCollection();
|
||||
if (collection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
|
||||
}
|
||||
if (!colShardReplicaMap.containsKey(collection)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection does not exist");
|
||||
}
|
||||
String query = req.getParams().get(CommonParams.Q);
|
||||
if (query == null || !query.equals("*:*")) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only '*:*' query is supported");
|
||||
}
|
||||
ClusterState clusterState = getClusterState();
|
||||
DocCollection coll = clusterState.getCollection(collection);
|
||||
AtomicLong count = new AtomicLong();
|
||||
for (Slice s : coll.getActiveSlicesArr()) {
|
||||
Replica r = s.getLeader();
|
||||
if (r == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, collection + "/" + s.getName() + " has no leader");
|
||||
}
|
||||
ReplicaInfo ri = getReplicaInfo(r);
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs", 0L);
|
||||
count.addAndGet(numDocs.longValue());
|
||||
}
|
||||
QueryResponse rsp = new QueryResponse();
|
||||
NamedList<Object> values = new NamedList<>();
|
||||
values.add("responseHeader", new NamedList<>());
|
||||
SolrDocumentList docs = new SolrDocumentList();
|
||||
docs.setNumFound(count.get());
|
||||
values.add("response", docs);
|
||||
rsp.setResponse(values);
|
||||
return rsp;
|
||||
}
|
||||
|
||||
private static String createRegistryName(String collection, String shard, Replica r) {
|
||||
return SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, shard,
|
||||
Utils.parseMetricsReplicaName(collection, r.getCoreName()));
|
||||
|
@ -1572,17 +1794,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
* divided by the number of replicas.
|
||||
*/
|
||||
public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
|
||||
List<ReplicaInfo> infos = new ArrayList<>();
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
replicas.forEach(r -> {
|
||||
if (r.getCollection().equals(collection)) {
|
||||
if (shard != null && !shard.equals(r.getShard())) {
|
||||
return;
|
||||
}
|
||||
infos.add(r);
|
||||
}
|
||||
});
|
||||
});
|
||||
final List<ReplicaInfo> infos;
|
||||
if (shard == null) {
|
||||
infos = new ArrayList<>();
|
||||
colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
|
||||
.forEach((sh, replicas) -> infos.addAll(replicas));
|
||||
} else {
|
||||
infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(shard, s -> new ArrayList<>());
|
||||
}
|
||||
if (infos.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist (shard=" + shard + ").");
|
||||
}
|
||||
|
@ -1602,22 +1822,50 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
Object prevValue = r.getVariables().get(key);
|
||||
if (prevValue != null) {
|
||||
if ((prevValue instanceof Number) && (value instanceof Number)) {
|
||||
if (((prevValue instanceof Long) || (prevValue instanceof Integer)) &&
|
||||
if (((prevValue instanceof Long) || (prevValue instanceof Integer) ||
|
||||
(prevValue instanceof AtomicLong) || (prevValue instanceof AtomicInteger)) &&
|
||||
((value instanceof Long) || (value instanceof Integer))) {
|
||||
Long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
|
||||
r.getVariables().put(key, newValue);
|
||||
long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
|
||||
// minimize object allocations
|
||||
if (prevValue instanceof AtomicLong) {
|
||||
((AtomicLong)prevValue).set(newValue);
|
||||
} else if (prevValue instanceof AtomicInteger) {
|
||||
((AtomicInteger)prevValue).set(((Number)prevValue).intValue() + ((Number)value).intValue());
|
||||
} else {
|
||||
r.getVariables().put(key, newValue);
|
||||
}
|
||||
} else {
|
||||
Double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
|
||||
r.getVariables().put(key, newValue);
|
||||
double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
|
||||
if (prevValue instanceof AtomicDouble) {
|
||||
((AtomicDouble)prevValue).set(newValue);
|
||||
} else {
|
||||
r.getVariables().put(key, newValue);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
|
||||
}
|
||||
} else {
|
||||
r.getVariables().put(key, value);
|
||||
if (value instanceof Integer) {
|
||||
r.getVariables().put(key, new AtomicInteger((Integer)value));
|
||||
} else if (value instanceof Long) {
|
||||
r.getVariables().put(key, new AtomicLong((Long)value));
|
||||
} else if (value instanceof Double) {
|
||||
r.getVariables().put(key, new AtomicDouble((Double)value));
|
||||
} else {
|
||||
r.getVariables().put(key, value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
r.getVariables().put(key, value);
|
||||
if (value instanceof Integer) {
|
||||
r.getVariables().put(key, new AtomicInteger((Integer)value));
|
||||
} else if (value instanceof Long) {
|
||||
r.getVariables().put(key, new AtomicLong((Long)value));
|
||||
} else if (value instanceof Double) {
|
||||
r.getVariables().put(key, new AtomicDouble((Double)value));
|
||||
} else {
|
||||
r.getVariables().put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1639,21 +1887,128 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
public List<ReplicaInfo> simGetReplicaInfos(String collection, String shard) {
|
||||
List<ReplicaInfo> replicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(shard, s -> new ArrayList<>());
|
||||
if (replicas == null) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
// make a defensive copy to avoid ConcurrentModificationException
|
||||
return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List collections.
|
||||
* @return list of existing collections.
|
||||
*/
|
||||
public List<String> simListCollections() throws InterruptedException {
|
||||
final Set<String> collections = new HashSet<>();
|
||||
return new ArrayList<>(colShardReplicaMap.keySet());
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Object>> simGetCollectionStats() throws IOException, InterruptedException {
|
||||
Map<String, Map<String, Object>> stats = new TreeMap<>();
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
replicas.forEach(ri -> collections.add(ri.getCollection()));
|
||||
collectionsStatesRef.set(null);
|
||||
ClusterState state = getClusterState();
|
||||
state.forEachCollection(coll -> {
|
||||
Map<String, Object> perColl = new LinkedHashMap<>();
|
||||
stats.put(coll.getName(), perColl);
|
||||
perColl.put("shardsTotal", coll.getSlices().size());
|
||||
Map<String, AtomicInteger> shardState = new TreeMap<>();
|
||||
int noLeader = 0;
|
||||
|
||||
SummaryStatistics docs = new SummaryStatistics();
|
||||
SummaryStatistics bytes = new SummaryStatistics();
|
||||
SummaryStatistics inactiveDocs = new SummaryStatistics();
|
||||
SummaryStatistics inactiveBytes = new SummaryStatistics();
|
||||
|
||||
long deletedDocs = 0;
|
||||
long bufferedDocs = 0;
|
||||
int totalReplicas = 0;
|
||||
int activeReplicas = 0;
|
||||
|
||||
for (Slice s : coll.getSlices()) {
|
||||
shardState.computeIfAbsent(s.getState().toString(), st -> new AtomicInteger())
|
||||
.incrementAndGet();
|
||||
totalReplicas += s.getReplicas().size();
|
||||
if (s.getState() != Slice.State.ACTIVE) {
|
||||
if (!s.getReplicas().isEmpty()) {
|
||||
ReplicaInfo ri = getReplicaInfo(s.getReplicas().iterator().next());
|
||||
if (ri != null) {
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
|
||||
Number numBytes = (Number)ri.getVariable("INDEX.sizeInBytes");
|
||||
if (numDocs != null) {
|
||||
inactiveDocs.addValue(numDocs.doubleValue());
|
||||
}
|
||||
if (numBytes != null) {
|
||||
inactiveBytes.addValue(numBytes.doubleValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
AtomicLong buffered = (AtomicLong)sliceProperties.get(coll.getName()).get(s.getName()).get(BUFFERED_UPDATES);
|
||||
if (buffered != null) {
|
||||
bufferedDocs += buffered.get();
|
||||
}
|
||||
activeReplicas += s.getReplicas().size();
|
||||
Replica leader = s.getLeader();
|
||||
if (leader == null) {
|
||||
noLeader++;
|
||||
if (!s.getReplicas().isEmpty()) {
|
||||
leader = s.getReplicas().iterator().next();
|
||||
}
|
||||
}
|
||||
ReplicaInfo ri = null;
|
||||
if (leader != null) {
|
||||
ri = getReplicaInfo(leader);
|
||||
if (ri == null) {
|
||||
log.warn("Unknown ReplicaInfo for {}", leader);
|
||||
}
|
||||
}
|
||||
if (ri != null) {
|
||||
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
|
||||
Number delDocs = (Number)ri.getVariable("SEARCHER.searcher.deleteDocs");
|
||||
Number numBytes = (Number)ri.getVariable("INDEX.sizeInBytes");
|
||||
if (numDocs != null) {
|
||||
docs.addValue(numDocs.doubleValue());
|
||||
}
|
||||
if (delDocs != null) {
|
||||
deletedDocs += delDocs.longValue();
|
||||
}
|
||||
if (numBytes != null) {
|
||||
bytes.addValue(numBytes.doubleValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
perColl.put("shardsState", shardState);
|
||||
perColl.put(" shardsWithoutLeader", noLeader);
|
||||
perColl.put("totalReplicas", totalReplicas);
|
||||
perColl.put(" activeReplicas", activeReplicas);
|
||||
perColl.put(" inactiveReplicas", totalReplicas - activeReplicas);
|
||||
long totalDocs = (long)docs.getSum() + bufferedDocs;
|
||||
perColl.put("totalActiveDocs", String.format(Locale.ROOT, "%,d", totalDocs));
|
||||
perColl.put(" bufferedDocs", String.format(Locale.ROOT, "%,d", bufferedDocs));
|
||||
perColl.put(" maxActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMax()));
|
||||
perColl.put(" minActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMin()));
|
||||
perColl.put(" avgActiveSliceDocs", String.format(Locale.ROOT, "%,.0f", docs.getMean()));
|
||||
perColl.put("totalInactiveDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getSum()));
|
||||
perColl.put(" maxInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMax()));
|
||||
perColl.put(" minInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMin()));
|
||||
perColl.put(" avgInactiveSliceDocs", String.format(Locale.ROOT, "%,.0f", inactiveDocs.getMean()));
|
||||
perColl.put("totalActiveBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getSum()));
|
||||
perColl.put(" maxActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMax()));
|
||||
perColl.put(" minActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMin()));
|
||||
perColl.put(" avgActiveSliceBytes", String.format(Locale.ROOT, "%,.0f", bytes.getMean()));
|
||||
perColl.put("totalInactiveBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getSum()));
|
||||
perColl.put(" maxInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMax()));
|
||||
perColl.put(" minInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMin()));
|
||||
perColl.put(" avgInactiveSliceBytes", String.format(Locale.ROOT, "%,.0f", inactiveBytes.getMean()));
|
||||
perColl.put("totalActiveDeletedDocs", String.format(Locale.ROOT, "%,d", deletedDocs));
|
||||
});
|
||||
// check collProps and sliceProps too
|
||||
collProperties.forEach((coll, props) -> collections.add(coll));
|
||||
sliceProperties.forEach((coll, slices) -> collections.add(coll));
|
||||
return new ArrayList<>(collections);
|
||||
return stats;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -1700,6 +2055,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
lock.lock();
|
||||
collectionsStatesRef.set(null);
|
||||
saveClusterState.set(true);
|
||||
log.debug("** creating new collection states");
|
||||
try {
|
||||
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
|
@ -1741,7 +2097,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
|
||||
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
|
||||
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
|
||||
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
|
||||
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion + 1, ZkStateReader.CLUSTER_STATE);
|
||||
res.put(coll, dc);
|
||||
});
|
||||
collectionsStatesRef.set(res);
|
||||
|
|
|
@ -214,6 +214,8 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
private final String id;
|
||||
private final Node root;
|
||||
|
||||
private int juteMaxbuffer = 0xfffff;
|
||||
|
||||
public SimDistribStateManager() {
|
||||
this(null);
|
||||
}
|
||||
|
@ -226,6 +228,8 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
this.id = IdUtils.timeRandomId();
|
||||
this.root = root != null ? root : createNewRootNode();
|
||||
watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
|
||||
String bufferSize = System.getProperty("jute.maxbuffer", Integer.toString(0xffffff));
|
||||
juteMaxbuffer = Integer.parseInt(bufferSize);
|
||||
}
|
||||
|
||||
public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
|
||||
|
@ -493,6 +497,9 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
|
||||
@Override
|
||||
public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
|
||||
if (data.length > juteMaxbuffer) {
|
||||
throw new IOException("Len error " + data.length);
|
||||
}
|
||||
multiLock.lock();
|
||||
Node n = null;
|
||||
try {
|
||||
|
|
|
@ -233,6 +233,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
|
|||
}
|
||||
|
||||
private static final Pattern REGISTRY_PATTERN = Pattern.compile("^solr\\.core\\.([\\w.-_]+?)\\.(shard[\\d_]+?)\\.(replica.*)");
|
||||
private static final Pattern METRIC_KEY_PATTERN = Pattern.compile("^metrics:([^:]+?):([^:]+?)(:([^:]+))?$");
|
||||
/**
|
||||
* Simulate getting replica metrics values. This uses per-replica properties set in
|
||||
* {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)} and
|
||||
|
@ -245,33 +246,31 @@ public class SimNodeStateProvider implements NodeStateProvider {
|
|||
if (!liveNodesSet.contains(node)) {
|
||||
throw new RuntimeException("non-live node " + node);
|
||||
}
|
||||
List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(node);
|
||||
if (replicas == null || replicas.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
Map<String, Object> values = new HashMap<>();
|
||||
for (String tag : tags) {
|
||||
String[] parts = tag.split(":");
|
||||
if (parts.length < 3 || !parts[0].equals("metrics")) {
|
||||
Matcher m = METRIC_KEY_PATTERN.matcher(tag);
|
||||
if (!m.matches() || m.groupCount() < 2) {
|
||||
log.warn("Invalid metrics: tag: " + tag);
|
||||
continue;
|
||||
}
|
||||
if (!parts[1].startsWith("solr.core.")) {
|
||||
String registryName = m.group(1);
|
||||
String key = m.group(3) != null ? m.group(2) + m.group(3) : m.group(2);
|
||||
if (!registryName.startsWith("solr.core.")) {
|
||||
// skip - this is probably solr.node or solr.jvm metric
|
||||
continue;
|
||||
}
|
||||
Matcher m = REGISTRY_PATTERN.matcher(parts[1]);
|
||||
m = REGISTRY_PATTERN.matcher(registryName);
|
||||
|
||||
if (!m.matches()) {
|
||||
log.warn("Invalid registry name: " + parts[1]);
|
||||
log.warn("Invalid registry name: " + registryName);
|
||||
continue;
|
||||
}
|
||||
String collection = m.group(1);
|
||||
String shard = m.group(2);
|
||||
String replica = m.group(3);
|
||||
String key = parts.length > 3 ? parts[2] + ":" + parts[3] : parts[2];
|
||||
List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(collection, shard);
|
||||
replicas.forEach(r -> {
|
||||
if (r.getCollection().equals(collection) && r.getShard().equals(shard) && r.getCore().endsWith(replica)) {
|
||||
if (r.getNode().equals(node) && r.getCore().endsWith(replica)) {
|
||||
Object value = r.getVariables().get(key);
|
||||
if (value != null) {
|
||||
values.put(tag, value);
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
|
|||
log.info("Collection ready after " + CloudTestUtils.waitForState(cluster, collectionName, 120, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 2, false, true)) + "ms");
|
||||
|
||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||
DocCollection docCollection = clusterState.getCollection(collectionName);
|
||||
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
|
||||
|
@ -181,7 +181,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
|
|||
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
|
||||
collectionName, CloudTestUtils.clusterShape(1, 2, false, true));
|
||||
|
||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
|
||||
DocCollection docCollection = clusterState.getCollection(collectionName);
|
||||
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Iterator;
|
||||
import java.util.Locale;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.CloudTestUtils;
|
||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@TimeoutSuite(millis = 48 * 3600 * 1000)
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.ComputePlanAction=INFO;org.apache.solr.cloud.autoscaling.ExecutePlanAction=DEBUG;org.apache.solr.cloud.autoscaling.ScheduledTriggers=DEBUG")
|
||||
//@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.CloudTestUtils=TRACE")
|
||||
public class TestSimExtremeIndexing extends SimSolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final int SPEED = 100;
|
||||
// use higher speed for larger scale tests
|
||||
// private static final int SPEED = 500;
|
||||
private static final int NUM_NODES = 200;
|
||||
|
||||
private static final long BATCH_SIZE = 200000;
|
||||
|
||||
private static final long NUM_BATCHES = 5000;
|
||||
// ... or use this for a 1 trillion docs test
|
||||
// private static final long NUM_BATCHES = 5000000;
|
||||
|
||||
// tweak this threshold to test the number of splits
|
||||
private static final long ABOVE_SIZE = 20000000;
|
||||
|
||||
|
||||
private static TimeSource timeSource;
|
||||
private static SolrClient solrClient;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
|
||||
timeSource = cluster.getTimeSource();
|
||||
solrClient = cluster.simGetSolrClient();
|
||||
cluster.simSetUseSystemCollection(false);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownCluster() throws Exception {
|
||||
solrClient = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScaleUp() throws Exception {
|
||||
String collectionName = "testScaleUp_collection";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||
"conf", 2, 2).setMaxShardsPerNode(10);
|
||||
create.process(solrClient);
|
||||
CloudTestUtils.waitForState(cluster, "failed to create " + collectionName, collectionName,
|
||||
CloudTestUtils.clusterShape(2, 2, false, true));
|
||||
|
||||
//long waitForSeconds = 3 + random().nextInt(5);
|
||||
long waitForSeconds = 1;
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'scaleUpTrigger'," +
|
||||
"'event' : 'indexSize'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'aboveDocs' : " + ABOVE_SIZE + "," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
|
||||
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
long batchSize = BATCH_SIZE;
|
||||
for (long i = 0; i < NUM_BATCHES; i++) {
|
||||
addDocs(collectionName, i * batchSize, batchSize);
|
||||
log.info(String.format(Locale.ROOT, "#### Total docs so far: %,d", ((i + 1) * batchSize)));
|
||||
timeSource.sleep(waitForSeconds);
|
||||
}
|
||||
timeSource.sleep(60000);
|
||||
QueryResponse rsp = solrClient.query(collectionName, params(CommonParams.Q, "*:*"));
|
||||
SolrDocumentList docs = rsp.getResults();
|
||||
assertNotNull(docs);
|
||||
assertEquals(docs.toString(), batchSize * NUM_BATCHES, docs.getNumFound());
|
||||
}
|
||||
|
||||
private void addDocs(String collection, long start, long count) throws Exception {
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.setParam("collection", collection);
|
||||
ureq.setDocIterator(new FakeDocIterator(start, count));
|
||||
solrClient.request(ureq);
|
||||
}
|
||||
|
||||
// lightweight generator of fake documents
|
||||
// NOTE: this iterator only ever returns the same document, which works ok
|
||||
// for our "index update" simulation. Obviously don't use this for real indexing.
|
||||
private static class FakeDocIterator implements Iterator<SolrInputDocument> {
|
||||
final SolrInputDocument doc = new SolrInputDocument();
|
||||
final SolrInputField idField = new SolrInputField("id");
|
||||
|
||||
final long start, count;
|
||||
|
||||
long current, max;
|
||||
|
||||
FakeDocIterator(long start, long count) {
|
||||
this.start = start;
|
||||
this.count = count;
|
||||
current = start;
|
||||
max = start + count;
|
||||
doc.put("id", idField);
|
||||
idField.setValue("foo");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return current < max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrInputDocument next() {
|
||||
current++;
|
||||
return doc;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -131,7 +131,7 @@ public class TestSimNodeLostTrigger extends SimSolrCloudTestCase {
|
|||
trigger.setProcessor(noFirstRunProcessor);
|
||||
trigger.run();
|
||||
|
||||
String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
cluster.simRemoveNode(lostNode, false);
|
||||
AtomicBoolean fired = new AtomicBoolean(false);
|
||||
trigger.setProcessor(event -> {
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
|
|||
|
||||
public void testCreateCollectionAddReplica() throws Exception {
|
||||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
|
||||
int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
|
||||
|
||||
|
@ -134,13 +134,13 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
|
|||
|
||||
public void testCreateCollectionSplitShard() throws Exception {
|
||||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String firstNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String firstNode = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
int firstNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(firstNode, ImplicitSnitch.PORT);
|
||||
|
||||
String secondNode;
|
||||
int secondNodePort;
|
||||
while (true) {
|
||||
secondNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
secondNode = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
secondNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(secondNode, ImplicitSnitch.PORT);
|
||||
if (secondNodePort != firstNodePort) break;
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
|
|||
|
||||
public void testCreateCollectionAddShardUsingPolicy() throws Exception {
|
||||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
|
||||
|
||||
String commands = "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + port + "'}]}}";
|
||||
|
@ -344,7 +344,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
|
|||
assertTrue("sysLoadAvg value is " + ((Number) val.get("sysLoadAvg")).doubleValue(), Double.compare(((Number) val.get("sysLoadAvg")).doubleValue(), 0.0d) > 0);
|
||||
}
|
||||
// simulator doesn't have Overseer, so just pick a random node
|
||||
String overseerNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String overseerNode = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
solrClient.request(CollectionAdminRequest.addRole(overseerNode, "overseer"));
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Map<String, Object> data = Utils.getJson(cluster.getDistribStateManager(), ZkStateReader.ROLES);
|
||||
|
|
|
@ -482,7 +482,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
cluster.simRemoveNode(lostNodeName, false);
|
||||
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
|
@ -650,7 +650,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
|
||||
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
|
@ -808,7 +808,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
|
||||
// pick overseer node
|
||||
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
|
||||
// add a node
|
||||
String node = cluster.simAddNode();
|
||||
|
@ -867,7 +867,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
|
||||
overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||
|
||||
// create another node
|
||||
log.info("====== ADD NODE 1");
|
||||
|
|
|
@ -94,6 +94,7 @@ public class Policy implements MapWriter {
|
|||
final List<Pair<String, Type>> params;
|
||||
final List<String> perReplicaAttributes;
|
||||
final int zkVersion;
|
||||
final boolean empty;
|
||||
|
||||
public Policy() {
|
||||
this(Collections.emptyMap());
|
||||
|
@ -104,6 +105,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public Policy(Map<String, Object> jsonMap, int version) {
|
||||
this.empty = jsonMap.get(CLUSTER_PREFERENCES) == null && jsonMap.get(CLUSTER_POLICY) == null && jsonMap.get(POLICIES) == null;
|
||||
this.zkVersion = version;
|
||||
int[] idx = new int[1];
|
||||
List<Preference> initialClusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCES, emptyList())).stream()
|
||||
|
@ -156,6 +158,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
private Policy(Map<String, List<Clause>> policies, List<Clause> clusterPolicy, List<Preference> clusterPreferences, int version) {
|
||||
this.empty = policies == null && clusterPolicy == null && clusterPreferences == null;
|
||||
this.zkVersion = version;
|
||||
this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
|
||||
this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
|
||||
|
@ -281,12 +284,17 @@ public class Policy implements MapWriter {
|
|||
return p.compare(r1, r2, false);
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
|
||||
// clusterPreferences,
|
||||
// lastComparison[0],
|
||||
// lastComparison[1],
|
||||
// Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
|
||||
log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
|
||||
clusterPreferences,
|
||||
lastComparison[0],
|
||||
lastComparison[1],
|
||||
Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
|
||||
throw e;
|
||||
lastComparison[0].node,
|
||||
lastComparison[1].node,
|
||||
matrix.size());
|
||||
throw new RuntimeException(e.getMessage());
|
||||
}
|
||||
p.setApproxVal(tmpMatrix);
|
||||
}
|
||||
|
@ -405,15 +413,6 @@ public class Policy implements MapWriter {
|
|||
return currentSession.getViolations();
|
||||
}
|
||||
|
||||
public boolean undo() {
|
||||
if (currentSession.parent != null) {
|
||||
currentSession = currentSession.parent;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
public Session getCurrentSession() {
|
||||
return currentSession;
|
||||
}
|
||||
|
@ -461,6 +460,10 @@ public class Policy implements MapWriter {
|
|||
return Utils.toJSONString(this);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return empty;
|
||||
}
|
||||
|
||||
/*This stores the logical state of the system, given a policy and
|
||||
* a cluster state.
|
||||
*
|
||||
|
@ -475,12 +478,10 @@ public class Policy implements MapWriter {
|
|||
List<Clause> expandedClauses;
|
||||
List<Violation> violations = new ArrayList<>();
|
||||
Transaction transaction;
|
||||
private Session parent = null;
|
||||
|
||||
private Session(List<String> nodes, SolrCloudManager cloudManager,
|
||||
List<Row> matrix, List<Clause> expandedClauses, int znodeVersion,
|
||||
NodeStateProvider nodeStateProvider, Transaction transaction, Session parent) {
|
||||
this.parent = parent;
|
||||
NodeStateProvider nodeStateProvider, Transaction transaction) {
|
||||
this.transaction = transaction;
|
||||
this.nodes = nodes;
|
||||
this.cloudManager = cloudManager;
|
||||
|
@ -552,7 +553,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
Session copy() {
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider, transaction, this);
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider, transaction);
|
||||
}
|
||||
|
||||
public Row getNode(String node) {
|
||||
|
|
|
@ -351,7 +351,7 @@ public class PolicyHelper {
|
|||
TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
|
||||
synchronized (lockObj) {
|
||||
sessionWrapper.status = Status.EXECUTING;
|
||||
log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
|
||||
log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
|
||||
sessionWrapper.createTime,
|
||||
this.sessionWrapper.createTime);
|
||||
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
|
||||
|
@ -362,7 +362,7 @@ public class PolicyHelper {
|
|||
//one thread who is waiting for this need to be notified.
|
||||
lockObj.notify();
|
||||
} else {
|
||||
log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
|
||||
log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
|
||||
//else just ignore it
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,10 +49,12 @@ public class ReplicaInfo implements MapWriter {
|
|||
this.collection = coll;
|
||||
this.shard = shard;
|
||||
this.type = r.getType();
|
||||
this.isLeader = r.getBool(LEADER_PROP, false);
|
||||
boolean maybeLeader = r.getBool(LEADER_PROP, false);
|
||||
if (vals != null) {
|
||||
this.variables.putAll(vals);
|
||||
maybeLeader = "true".equals(String.valueOf(vals.getOrDefault(LEADER_PROP, maybeLeader)));
|
||||
}
|
||||
this.isLeader = maybeLeader;
|
||||
this.node = r.getNodeName();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue