mirror of https://github.com/apache/lucene.git
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/lucene-solr
This commit is contained in:
commit
776899b4e8
|
@ -124,6 +124,8 @@ Bug Fixes
|
|||
|
||||
* SOLR-11794: PULL replicas stop replicating after collection RELOAD (Samuel Tatipamula, Tomás Fernández Löbbe)
|
||||
|
||||
* SOLR-11714: AddReplicaSuggester / ComputePlanAction infinite loop. (ab)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
|
|||
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
|
||||
String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
|
||||
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
|
||||
return new NoneSuggester();
|
||||
return NoneSuggester.get(session);
|
||||
}
|
||||
|
||||
Suggester suggester = super.getSuggester(session, event, cloudManager);
|
||||
|
@ -57,7 +57,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
|
|||
}
|
||||
}
|
||||
|
||||
if (!anyCollections) return new NoneSuggester();
|
||||
if (!anyCollections) return NoneSuggester.get(session);
|
||||
return suggester;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,29 +19,26 @@ package org.apache.solr.cloud.autoscaling;
|
|||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
|
||||
|
||||
/**
|
||||
* This class is responsible for using the configured policy and preferences
|
||||
* with the hints provided by the trigger event to compute the required cluster operations.
|
||||
|
@ -63,16 +60,43 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
}
|
||||
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
|
||||
Policy.Session session = sessionWrapper.get();
|
||||
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
|
||||
if (log.isTraceEnabled()) {
|
||||
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
|
||||
log.trace("-- session: {}", session);
|
||||
log.trace("-- state: {}", state);
|
||||
log.trace("-- state: {}", clusterState);
|
||||
}
|
||||
try {
|
||||
Suggester suggester = getSuggester(session, event, cloudManager);
|
||||
while (true) {
|
||||
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
|
||||
int requestedOperations = getRequestedNumOps(event);
|
||||
if (requestedOperations > maxOperations) {
|
||||
log.warn("Requested number of operations {} higher than maximum {}, adjusting...",
|
||||
requestedOperations, maxOperations);
|
||||
}
|
||||
int opCount = 0;
|
||||
int opLimit = maxOperations;
|
||||
if (requestedOperations > 0) {
|
||||
opLimit = requestedOperations;
|
||||
}
|
||||
do {
|
||||
SolrRequest operation = suggester.getSuggestion();
|
||||
if (operation == null) break;
|
||||
opCount++;
|
||||
// prepare suggester for the next iteration
|
||||
if (suggester.getSession() != null) {
|
||||
session = suggester.getSession();
|
||||
}
|
||||
suggester = getSuggester(session, event, cloudManager);
|
||||
|
||||
// break on first null op
|
||||
// unless a specific number of ops was requested
|
||||
if (operation == null) {
|
||||
if (requestedOperations < 0) {
|
||||
break;
|
||||
} else {
|
||||
log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try.");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
log.info("Computed Plan: {}", operation.getParams());
|
||||
Map<String, Object> props = context.getProperties();
|
||||
props.compute("operations", (k, v) -> {
|
||||
|
@ -81,15 +105,14 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
operations.add(operation);
|
||||
return operations;
|
||||
});
|
||||
session = suggester.getSession();
|
||||
suggester = getSuggester(session, event, cloudManager);
|
||||
}
|
||||
} while (opCount < opLimit);
|
||||
} finally {
|
||||
releasePolicySession(sessionWrapper, session);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unexpected exception while processing event: " + event, e); }
|
||||
"Unexpected exception while processing event: " + event, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
|
||||
|
@ -98,6 +121,33 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
|
||||
}
|
||||
|
||||
protected int getMaxNumOps(TriggerEvent event, AutoScalingConfig autoScalingConfig, ClusterState clusterState) {
|
||||
// estimate a maximum default limit that should be sufficient for most purposes:
|
||||
// number of nodes * total number of replicas * 3
|
||||
AtomicInteger totalRF = new AtomicInteger();
|
||||
clusterState.forEachCollection(coll -> totalRF.addAndGet(coll.getReplicationFactor() * coll.getSlices().size()));
|
||||
int totalMax = clusterState.getLiveNodes().size() * totalRF.get() * 3;
|
||||
int maxOp = (Integer)autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax);
|
||||
Object o = event.getProperty(AutoScalingParams.MAX_COMPUTE_OPERATIONS, maxOp);
|
||||
try {
|
||||
return Integer.parseInt(String.valueOf(o));
|
||||
} catch (Exception e) {
|
||||
log.warn("Invalid '" + AutoScalingParams.MAX_COMPUTE_OPERATIONS + "' event property: " + o + ", using default " + maxOp);
|
||||
return maxOp;
|
||||
}
|
||||
}
|
||||
|
||||
protected int getRequestedNumOps(TriggerEvent event) {
|
||||
Collection<TriggerEvent.Op> ops = (Collection<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
|
||||
if (ops.isEmpty()) {
|
||||
return -1;
|
||||
} else {
|
||||
return ops.size();
|
||||
}
|
||||
}
|
||||
|
||||
private static final String START = "__start__";
|
||||
|
||||
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
|
||||
Suggester suggester;
|
||||
switch (event.getEventType()) {
|
||||
|
@ -110,51 +160,21 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
.hint(Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
|
||||
break;
|
||||
case SEARCHRATE:
|
||||
Map<String, Map<String, Double>> hotShards = (Map<String, Map<String, Double>>)event.getProperty(AutoScalingParams.SHARD);
|
||||
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
List<ReplicaInfo> hotReplicas = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
|
||||
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
|
||||
if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
|
||||
// node -> MOVEREPLICA
|
||||
if (hotNodes.isEmpty()) {
|
||||
log.warn("Neither hot replicas / collection nor nodes are reported in event: " + event);
|
||||
return NoneSuggester.INSTANCE;
|
||||
}
|
||||
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA);
|
||||
for (String node : hotNodes.keySet()) {
|
||||
suggester = suggester.hint(Suggester.Hint.SRC_NODE, node);
|
||||
}
|
||||
} else {
|
||||
// collection || shard || replica -> ADDREPLICA
|
||||
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
|
||||
Set<Pair> collectionShards = new HashSet<>();
|
||||
hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> collectionShards.add(new Pair(coll, s))));
|
||||
for (Pair<String, String> colShard : collectionShards) {
|
||||
suggester = suggester.hint(Suggester.Hint.COLL_SHARD, colShard);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case METRIC:
|
||||
Map<String, Number> sourceNodes = (Map<String, Number>) event.getProperty(AutoScalingParams.NODE);
|
||||
String collection = (String) event.getProperty(AutoScalingParams.COLLECTION);
|
||||
String shard = (String) event.getProperty(AutoScalingParams.SHARD);
|
||||
String preferredOp = (String) event.getProperty(PREFERRED_OP);
|
||||
if (sourceNodes.isEmpty()) {
|
||||
log.warn("No nodes reported in event: " + event);
|
||||
return NoneSuggester.INSTANCE;
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
|
||||
int start = (Integer)event.getProperty(START, 0);
|
||||
if (ops.isEmpty() || start >= ops.size()) {
|
||||
return NoneSuggester.get(session);
|
||||
}
|
||||
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp == null ? CollectionParams.CollectionAction.MOVEREPLICA.toLower() : preferredOp);
|
||||
suggester = session.getSuggester(action);
|
||||
for (String node : sourceNodes.keySet()) {
|
||||
suggester = suggester.hint(Suggester.Hint.SRC_NODE, node);
|
||||
TriggerEvent.Op op = ops.get(start);
|
||||
suggester = session.getSuggester(op.getAction());
|
||||
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
|
||||
suggester = suggester.hint(e.getKey(), e.getValue());
|
||||
}
|
||||
if (collection != null) {
|
||||
if (shard == null) {
|
||||
suggester = suggester.hint(Suggester.Hint.COLL, collection);
|
||||
} else {
|
||||
suggester = suggester.hint(Suggester.Hint.COLL_SHARD, new Pair(collection, shard));
|
||||
}
|
||||
if (++start >= ops.size()) {
|
||||
event.getProperties().remove(START);
|
||||
} else {
|
||||
event.getProperties().put(START, start);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -30,12 +32,15 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -65,7 +70,7 @@ public class MetricTrigger extends TriggerBase {
|
|||
throw new IllegalArgumentException("When 'shard' is other than #ANY then collection name must be also other than #ANY");
|
||||
}
|
||||
node = (String) properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
|
||||
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, null);
|
||||
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,9 +187,23 @@ public class MetricTrigger extends TriggerBase {
|
|||
if (!shard.equals(Policy.ANY)) {
|
||||
properties.put(AutoScalingParams.SHARD, shard);
|
||||
}
|
||||
if (preferredOp != null) {
|
||||
properties.put(PREFERRED_OP, preferredOp);
|
||||
properties.put(PREFERRED_OP, preferredOp);
|
||||
|
||||
// specify requested ops
|
||||
List<Op> ops = new ArrayList<>(hotNodes.size());
|
||||
for (String n : hotNodes.keySet()) {
|
||||
Op op = new Op(CollectionParams.CollectionAction.get(preferredOp));
|
||||
op.setHint(Suggester.Hint.SRC_NODE, n);
|
||||
if (!collection.equals(Policy.ANY)) {
|
||||
if (!shard.equals(Policy.ANY)) {
|
||||
op.setHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
|
||||
} else {
|
||||
op.setHint(Suggester.Hint.COLL, collection);
|
||||
}
|
||||
}
|
||||
ops.add(op);
|
||||
}
|
||||
properties.put(TriggerEvent.REQUESTED_OPS, ops);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,9 +31,12 @@ import com.google.common.util.concurrent.AtomicDouble;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.metrics.SolrCoreMetricManager;
|
||||
|
@ -145,12 +148,14 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
|
||||
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
|
||||
Map<String, AtomicDouble> nodeRates = new HashMap<>();
|
||||
Map<String, Integer> replicationFactors = new HashMap<>();
|
||||
|
||||
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
|
||||
Map<String, ReplicaInfo> metricTags = new HashMap<>();
|
||||
// coll, shard, replica
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
|
||||
infos.forEach((coll, shards) -> {
|
||||
replicationFactors.computeIfAbsent(coll, c -> shards.size());
|
||||
shards.forEach((sh, replicas) -> {
|
||||
replicas.forEach(replica -> {
|
||||
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
|
||||
|
@ -261,7 +266,41 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
});
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), eventTime.get(), hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// calculate the number of replicas to add to each hot shard, based on how much the rate was
|
||||
// exceeded - but within limits.
|
||||
final List<TriggerEvent.Op> ops = new ArrayList<>();
|
||||
if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
|
||||
// move replicas around
|
||||
hotNodes.forEach((n, r) -> {
|
||||
ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
|
||||
});
|
||||
} else {
|
||||
// add replicas
|
||||
Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
|
||||
|
||||
hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> {
|
||||
List<Pair<String, String>> perShard = hints
|
||||
.computeIfAbsent(coll, c -> new HashMap<>())
|
||||
.computeIfAbsent(s, sh -> new ArrayList<>());
|
||||
addHints(coll, s, r, replicationFactors.get(coll), perShard);
|
||||
}));
|
||||
hotReplicas.forEach(ri -> {
|
||||
double r = (Double)ri.getVariable(AutoScalingParams.RATE);
|
||||
// add only if not already accounted for in hotShards
|
||||
List<Pair<String, String>> perShard = hints
|
||||
.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
|
||||
if (perShard.isEmpty()) {
|
||||
addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
|
||||
}
|
||||
});
|
||||
|
||||
hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
|
||||
ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
|
||||
})));
|
||||
}
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// update lastEvent times
|
||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
|
@ -271,6 +310,19 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
|
||||
int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
|
||||
if (numReplicas < 1) {
|
||||
numReplicas = 1;
|
||||
}
|
||||
if (numReplicas > 3) {
|
||||
numReplicas = 3;
|
||||
}
|
||||
for (int i = 0; i < numReplicas; i++) {
|
||||
hints.add(new Pair(collection, shard));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
|
||||
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
|
||||
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
|
||||
|
@ -282,10 +334,11 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
|
||||
public static class SearchRateEvent extends TriggerEvent {
|
||||
public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
|
||||
public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
|
||||
Map<String, Double> hotCollections,
|
||||
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
|
||||
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
|
||||
properties.put(TriggerEvent.REQUESTED_OPS, ops);
|
||||
properties.put(AutoScalingParams.COLLECTION, hotCollections);
|
||||
properties.put(AutoScalingParams.SHARD, hotShards);
|
||||
properties.put(AutoScalingParams.REPLICA, hotReplicas);
|
||||
|
|
|
@ -17,11 +17,14 @@
|
|||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.IdUtils;
|
||||
|
||||
|
@ -33,6 +36,41 @@ public class TriggerEvent implements MapWriter {
|
|||
public static final String REPLAYING = "replaying";
|
||||
public static final String NODE_NAMES = "nodeNames";
|
||||
public static final String EVENT_TIMES = "eventTimes";
|
||||
public static final String REQUESTED_OPS = "requestedOps";
|
||||
|
||||
public static final class Op {
|
||||
private final CollectionParams.CollectionAction action;
|
||||
private final EnumMap<Suggester.Hint, Object> hints = new EnumMap<>(Suggester.Hint.class);
|
||||
|
||||
public Op(CollectionParams.CollectionAction action) {
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) {
|
||||
this.action = action;
|
||||
this.hints.put(hint, hintValue);
|
||||
}
|
||||
|
||||
public void setHint(Suggester.Hint hint, Object value) {
|
||||
hints.put(hint, value);
|
||||
}
|
||||
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return action;
|
||||
}
|
||||
|
||||
public EnumMap<Suggester.Hint, Object> getHints() {
|
||||
return hints;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Op{" +
|
||||
"action=" + action +
|
||||
", hints=" + hints +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
protected final String id;
|
||||
protected final String source;
|
||||
|
@ -93,6 +131,18 @@ public class TriggerEvent implements MapWriter {
|
|||
return properties.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a named event property or default value if missing.
|
||||
*/
|
||||
public Object getProperty(String name, Object defaultValue) {
|
||||
Object v = properties.get(name);
|
||||
if (v == null) {
|
||||
return defaultValue;
|
||||
} else {
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Event type.
|
||||
*/
|
||||
|
|
|
@ -310,6 +310,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("indexOf", IndexOfEvaluator.class)
|
||||
.withFunctionName("columnCount", ColumnCountEvaluator.class)
|
||||
.withFunctionName("rowCount", RowCountEvaluator.class)
|
||||
.withFunctionName("fuzzyKmeans", FuzzyKmeansEvaluator.class)
|
||||
.withFunctionName("getMembershipMatrix", GetMembershipMatrixEvaluator.class)
|
||||
.withFunctionName("multiKmeans", MultiKmeansEvaluator.class)
|
||||
|
||||
// Boolean Stream Evaluators
|
||||
|
||||
|
|
|
@ -80,9 +80,9 @@ import static org.apache.solr.update.processor.DistributedUpdateProcessor.Distri
|
|||
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
|
||||
|
||||
|
||||
/**
|
||||
* This holds references to the transaction logs and pointers for the document IDs to their
|
||||
* exact positions in the transaction logs.
|
||||
/**
|
||||
* This holds references to the transaction logs. It also keeps a map of unique key to location in log
|
||||
* (along with the update's version). This map is only cleared on soft or hard commit
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
|
|
|
@ -140,17 +140,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
events.clear();
|
||||
listenerEvents.clear();
|
||||
lastActionExecutedAt.set(0);
|
||||
while (cluster.getJettySolrRunners().size() < 2) {
|
||||
// perhaps a test stopped a node but didn't start it back
|
||||
// lets start a node
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
// clear any events or markers
|
||||
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
while (cluster.getJettySolrRunners().size() < 2) {
|
||||
// perhaps a test stopped a node but didn't start it back
|
||||
// lets start a node
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
}
|
||||
|
||||
|
@ -512,6 +512,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
triggerFired.set(false);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
|
||||
cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
|
@ -569,7 +571,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'name' : 'node_added_triggerCTOOR'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -667,7 +669,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger1'," +
|
||||
"'name' : 'node_added_triggerEQ'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -725,7 +727,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'name' : 'node_added_triggerEFRS'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '10s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -890,7 +892,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
log.info("====== ADD TRIGGERS");
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'name' : 'node_added_triggerMR'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '1s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -902,7 +904,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'name' : 'node_lost_triggerMR'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '1s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -995,7 +997,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'name' : 'node_added_triggerL'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
|
@ -1016,7 +1018,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'foo'," +
|
||||
"'trigger' : 'node_added_trigger'," +
|
||||
"'trigger' : 'node_added_triggerL'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
||||
"'beforeAction' : 'test'," +
|
||||
"'afterAction' : ['test', 'test1']," +
|
||||
|
@ -1031,7 +1033,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'bar'," +
|
||||
"'trigger' : 'node_added_trigger'," +
|
||||
"'trigger' : 'node_added_triggerL'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'beforeAction' : ['test', 'test1']," +
|
||||
"'afterAction' : 'test'," +
|
||||
|
@ -1387,7 +1389,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
|
||||
//@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
|
||||
public void testSearchRate() throws Exception {
|
||||
// start a few more jetty-s
|
||||
for (int i = 0; i < 3; i++) {
|
||||
|
@ -1435,9 +1437,19 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
Thread.sleep(5000);
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
assertEquals(listenerEvents.toString(), 4, events.size());
|
||||
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
|
||||
assertEquals("compute", events.get(0).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
|
||||
assertEquals("execute", events.get(1).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(2).stage.toString());
|
||||
assertEquals("test", events.get(2).actionName);
|
||||
assertEquals("SUCCEEDED", events.get(3).stage.toString());
|
||||
assertNull(events.get(3).actionName);
|
||||
|
||||
CapturedEvent ev = events.get(0);
|
||||
long now = timeSource.getTime();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
|
@ -1471,6 +1483,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
assertEquals(collectionRate, totalNodeRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalShardRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
|
||||
|
||||
// check operations
|
||||
List<Map<String, Object>> ops = (List<Map<String, Object>>)ev.context.get("properties.operations");
|
||||
assertNotNull(ops);
|
||||
assertTrue(ops.size() > 1);
|
||||
for (Map<String, Object> m : ops) {
|
||||
assertEquals("ADDREPLICA", m.get("params.action"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1556,46 +1576,46 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// todo uncomment the following code once SOLR-11714 is fixed
|
||||
// find a new replica and create its metric name
|
||||
// replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||
// coreName = replica.getCoreName();
|
||||
// replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||
// registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||
// tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||
//
|
||||
// setTriggerCommand = "{" +
|
||||
// "'set-trigger' : {" +
|
||||
// "'name' : 'metric_trigger'," +
|
||||
// "'event' : 'metric'," +
|
||||
// "'waitFor' : '" + waitForSeconds + "s'," +
|
||||
// "'enabled' : true," +
|
||||
// "'metric': '" + tag + "'" +
|
||||
// "'above' : 100.0," +
|
||||
// "'collection': '" + collectionName + "'" +
|
||||
// "'shard':'" + shardId + "'" +
|
||||
// "'preferredOperation':'addreplica'" +
|
||||
// "'actions' : [" +
|
||||
// "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
// "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
// "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
// "]" +
|
||||
// "}}";
|
||||
// req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
// response = solrClient.request(req);
|
||||
// assertEquals(response.get("result").toString(), "success");
|
||||
//
|
||||
// triggerFiredLatch = new CountDownLatch(1);
|
||||
// listenerEvents.clear();
|
||||
// await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
// assertTrue("The trigger did not fire at all", await);
|
||||
// // wait for listener to capture the SUCCEEDED stage
|
||||
// Thread.sleep(2000);
|
||||
// assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||
// ev = listenerEvents.get("srt").get(0);
|
||||
// now = timeSource.getTime();
|
||||
// // verify waitFor
|
||||
// assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
// assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||
// docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
// assertEquals(3, docCollection.getReplicas().size());
|
||||
replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||
coreName = replica.getCoreName();
|
||||
replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||
registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||
tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'metric_trigger'," +
|
||||
"'event' : 'metric'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'metric': '" + tag + "'" +
|
||||
"'above' : 100.0," +
|
||||
"'collection': '" + collectionName + "'" +
|
||||
"'shard':'" + shardId + "'" +
|
||||
"'preferredOperation':'addreplica'" +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
listenerEvents.clear();
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||
ev = listenerEvents.get("srt").get(0);
|
||||
now = timeSource.getTime();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
assertEquals(3, docCollection.getReplicas().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,6 +107,8 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
private SolrResourceLoader loader;
|
||||
|
||||
private static int nodeIdPort = 10000;
|
||||
public static int DEFAULT_DISK = 1000; // 1000 GB
|
||||
public static int DEFAULT_IDX_SIZE_BYTES = 1000000000; // 1 GB
|
||||
|
||||
/**
|
||||
* Create a simulated cluster. This cluster uses the following components:
|
||||
|
@ -227,7 +229,7 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
values.put(ImplicitSnitch.PORT, port);
|
||||
values.put(ImplicitSnitch.NODE, nodeId);
|
||||
values.put(ImplicitSnitch.CORES, 0);
|
||||
values.put(ImplicitSnitch.DISK, 1000);
|
||||
values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
|
||||
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
|
||||
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
|
||||
values.put("sysprop.java.version", System.getProperty("java.version"));
|
||||
|
|
|
@ -324,6 +324,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
* @param results result of the operation
|
||||
*/
|
||||
public void simAddReplica(ZkNodeProps message, NamedList results) throws Exception {
|
||||
if (message.getStr(CommonAdminParams.ASYNC) != null) {
|
||||
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
|
||||
}
|
||||
ClusterState clusterState = getClusterState();
|
||||
DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
|
||||
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
|
||||
|
@ -394,7 +397,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
// mark replica as active
|
||||
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
|
||||
// add a property expected in tests
|
||||
replicaInfo.getVariables().put(Suggestion.coreidxsize, 123450000);
|
||||
replicaInfo.getVariables().put(Suggestion.coreidxsize, SimCloudManager.DEFAULT_IDX_SIZE_BYTES);
|
||||
|
||||
replicas.add(replicaInfo);
|
||||
// at this point nuke our cached DocCollection state
|
||||
|
@ -411,9 +414,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
|
||||
Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
|
||||
if (disk == null) {
|
||||
disk = 1000;
|
||||
disk = SimCloudManager.DEFAULT_DISK;
|
||||
}
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 10);
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
|
||||
if (runLeaderElection) {
|
||||
simRunLeaderElection(Collections.singleton(replicaInfo.getCollection()), true);
|
||||
}
|
||||
|
@ -449,7 +452,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (disk == null || disk == 0) {
|
||||
throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
|
||||
}
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 10);
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 1);
|
||||
}
|
||||
LOG.trace("-- simRemoveReplica {}", ri);
|
||||
simRunLeaderElection(Collections.singleton(ri.getCollection()), true);
|
||||
|
@ -722,7 +725,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
collProperties.clear();
|
||||
sliceProperties.clear();
|
||||
leaderThrottles.clear();
|
||||
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> values.put("cores", 0));
|
||||
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
|
||||
values.put(ImplicitSnitch.CORES, 0);
|
||||
values.put(ImplicitSnitch.DISK, 1000);
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
|
|
@ -36,7 +36,9 @@ import org.apache.solr.client.solrj.SolrClient;
|
|||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.solr.common.SolrInputDocument;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.Before;
|
||||
|
@ -490,7 +493,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
|
||||
//@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
|
||||
public void testSearchRate() throws Exception {
|
||||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
|
@ -529,9 +532,10 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
|
||||
log.info("Ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(2, 10)) + " ms");
|
||||
|
||||
// collect the node names
|
||||
// collect the node names for shard1
|
||||
Set<String> nodes = new HashSet<>();
|
||||
cluster.getSimClusterStateProvider().getClusterState().getCollection(collectionName)
|
||||
.getSlice("shard1")
|
||||
.getReplicas()
|
||||
.forEach(r -> nodes.add(r.getNodeName()));
|
||||
|
||||
|
@ -539,11 +543,28 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
// simulate search traffic
|
||||
cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
|
||||
|
||||
// boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
// assertTrue("The trigger did not fire at all", await);
|
||||
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
cluster.getTimeSource().sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
|
||||
Map<String, Number> m = (Map<String, Number>)ev.event.getProperty("node");
|
||||
assertNotNull(m);
|
||||
assertEquals(nodes.size(), m.size());
|
||||
assertEquals(nodes, m.keySet());
|
||||
m.forEach((k, v) -> assertEquals(4.0, v.doubleValue(), 0.01));
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull(ops);
|
||||
assertEquals(3, ops.size());
|
||||
ops.forEach(op -> {
|
||||
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, op.getAction());
|
||||
assertEquals(1, op.getHints().size());
|
||||
Pair<String, String> hint = (Pair<String, String>)op.getHints().get(Suggester.Hint.COLL_SHARD);
|
||||
assertNotNull(hint);
|
||||
assertEquals(collectionName, hint.first());
|
||||
assertEquals("shard1", hint.second());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
|
||||
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||
|
@ -1142,6 +1144,8 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
"'enabled' : true," +
|
||||
"'rate' : 1.0," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}" +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
|
@ -1155,6 +1159,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
"'name' : 'srt'," +
|
||||
"'trigger' : 'search_rate_trigger'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute', 'test']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
|
@ -1172,9 +1177,20 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
cluster.getTimeSource().sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
cluster.getTimeSource().sleep(5000);
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
|
||||
assertEquals(listenerEvents.toString(), 4, events.size());
|
||||
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
|
||||
assertEquals("compute", events.get(0).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
|
||||
assertEquals("execute", events.get(1).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(2).stage.toString());
|
||||
assertEquals("test", events.get(2).actionName);
|
||||
assertEquals("SUCCEEDED", events.get(3).stage.toString());
|
||||
assertNull(events.get(3).actionName);
|
||||
|
||||
CapturedEvent ev = events.get(0);
|
||||
long now = cluster.getTimeSource().getTime();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
|
@ -1208,5 +1224,13 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
assertTrue(totalNodeRate.get() > 100.0);
|
||||
assertTrue(totalShardRate.get() > 100.0);
|
||||
assertTrue(totalReplicaRate.get() > 100.0);
|
||||
|
||||
// check operations
|
||||
List<Map<String, Object>> ops = (List<Map<String, Object>>)ev.context.get("properties.operations");
|
||||
assertNotNull(ops);
|
||||
assertTrue(ops.size() > 1);
|
||||
for (Map<String, Object> m : ops) {
|
||||
assertEquals("ADDREPLICA", m.get("params.action"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.solr.handler.component;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressPointFields;
|
||||
import org.apache.solr.BaseDistributedSearchTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -26,7 +25,6 @@ import org.junit.Test;
|
|||
*
|
||||
* @since solr 1.5
|
||||
*/
|
||||
@SuppressPointFields(bugUrl="https://issues.apache.org/jira/browse/SOLR-11173")
|
||||
public class DistributedTermsComponentTest extends BaseDistributedSearchTestCase {
|
||||
|
||||
@Test
|
||||
|
|
|
@ -223,8 +223,6 @@ Reads the field text and generates edge n-gram tokens of sizes in the given rang
|
|||
|
||||
`maxGramSize`: (integer, default is 1) The maximum n-gram size, must be >= `minGramSize`.
|
||||
|
||||
`side`: ("front" or "back", default is "front") Whether to compute the n-grams from the beginning (front) of the text or from the end (back).
|
||||
|
||||
*Example:*
|
||||
|
||||
Default behavior (min and max default to 1):
|
||||
|
@ -255,21 +253,6 @@ Edge n-gram range of 2 to 5
|
|||
|
||||
**Out:**"ba", "bab", "baba", "babal"
|
||||
|
||||
*Example:*
|
||||
|
||||
Edge n-gram range of 2 to 5, from the back side:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<analyzer>
|
||||
<tokenizer class="solr.EdgeNGramTokenizerFactory" minGramSize="2" maxGramSize="5" side="back"/>
|
||||
</analyzer>
|
||||
----
|
||||
|
||||
*In:* "babaloo"
|
||||
|
||||
*Out:* "oo", "loo", "aloo", "baloo"
|
||||
|
||||
== ICU Tokenizer
|
||||
|
||||
This tokenizer processes multilingual text and tokenizes it appropriately based on its script attribute.
|
||||
|
|
|
@ -20,7 +20,12 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
|
|||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
|
||||
public class NoneSuggester extends Suggester {
|
||||
public static final NoneSuggester INSTANCE = new NoneSuggester();
|
||||
|
||||
public static NoneSuggester get(Policy.Session session) {
|
||||
NoneSuggester suggester = new NoneSuggester();
|
||||
suggester._init(session);
|
||||
return suggester;
|
||||
}
|
||||
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
|
|
|
@ -273,7 +273,7 @@ public class PolicyHelper {
|
|||
*
|
||||
*/
|
||||
private void returnSession(SessionWrapper sessionWrapper) {
|
||||
TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
|
||||
TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
|
||||
synchronized (lockObj) {
|
||||
sessionWrapper.status = Status.EXECUTING;
|
||||
log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.io.eval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.math3.linear.RealMatrix;
|
||||
import org.apache.commons.math3.ml.clustering.CentroidCluster;
|
||||
import org.apache.commons.math3.ml.distance.EuclideanDistance;
|
||||
import org.apache.commons.math3.ml.clustering.FuzzyKMeansClusterer;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
public class FuzzyKmeansEvaluator extends RecursiveObjectEvaluator implements TwoValueWorker {
|
||||
protected static final long serialVersionUID = 1L;
|
||||
|
||||
|
||||
private int maxIterations = 1000;
|
||||
private double fuzziness = 1.2;
|
||||
|
||||
public FuzzyKmeansEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
|
||||
super(expression, factory);
|
||||
|
||||
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
|
||||
|
||||
for(StreamExpressionNamedParameter namedParam : namedParams){
|
||||
if(namedParam.getName().equals("fuzziness")){
|
||||
this.fuzziness = Double.parseDouble(namedParam.getParameter().toString().trim());
|
||||
} else if(namedParam.getName().equals("maxIterations")) {
|
||||
this.maxIterations = Integer.parseInt(namedParam.getParameter().toString().trim());
|
||||
} else {
|
||||
throw new IOException("Unexpected named parameter:"+namedParam.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object doWork(Object value1, Object value2) throws IOException {
|
||||
|
||||
|
||||
Matrix matrix = null;
|
||||
int k = 0;
|
||||
|
||||
|
||||
if(value1 instanceof Matrix) {
|
||||
matrix = (Matrix)value1;
|
||||
} else {
|
||||
throw new IOException("The first parameter for fuzzyKmeans should be the observation matrix.");
|
||||
}
|
||||
|
||||
if(value2 instanceof Number) {
|
||||
k = ((Number)value2).intValue();
|
||||
} else {
|
||||
throw new IOException("The second parameter for fuzzyKmeans should be k.");
|
||||
}
|
||||
|
||||
FuzzyKMeansClusterer<KmeansEvaluator.ClusterPoint> kmeans = new FuzzyKMeansClusterer(k,
|
||||
fuzziness,
|
||||
maxIterations,
|
||||
new EuclideanDistance());
|
||||
List<KmeansEvaluator.ClusterPoint> points = new ArrayList();
|
||||
double[][] data = matrix.getData();
|
||||
|
||||
List<String> ids = matrix.getRowLabels();
|
||||
|
||||
for(int i=0; i<data.length; i++) {
|
||||
double[] vec = data[i];
|
||||
points.add(new KmeansEvaluator.ClusterPoint(ids.get(i), vec));
|
||||
}
|
||||
|
||||
Map fields = new HashMap();
|
||||
|
||||
fields.put("k", k);
|
||||
fields.put("fuzziness", fuzziness);
|
||||
fields.put("distance", "euclidean");
|
||||
fields.put("maxIterations", maxIterations);
|
||||
|
||||
List<CentroidCluster<KmeansEvaluator.ClusterPoint>> clusters = kmeans.cluster(points);
|
||||
RealMatrix realMatrix = kmeans.getMembershipMatrix();
|
||||
double[][] mmData = realMatrix.getData();
|
||||
Matrix mmMatrix = new Matrix(mmData);
|
||||
mmMatrix.setRowLabels(matrix.getRowLabels());
|
||||
return new KmeansEvaluator.ClusterTuple(fields, clusters, matrix.getColumnLabels(),mmMatrix);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.io.eval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
public class GetMembershipMatrixEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
public GetMembershipMatrixEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
super(expression, factory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object doWork(Object value) throws IOException {
|
||||
if(!(value instanceof KmeansEvaluator.ClusterTuple)){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for value, expecting a clustering result",toExpression(constructingFactory), value.getClass().getSimpleName()));
|
||||
} else {
|
||||
KmeansEvaluator.ClusterTuple clusterTuple = (KmeansEvaluator.ClusterTuple)value;
|
||||
return clusterTuple.getMembershipMatrix();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,43 +29,46 @@ import org.apache.solr.client.solrj.io.Tuple;
|
|||
import org.apache.commons.math3.ml.clustering.Clusterable;
|
||||
import org.apache.commons.math3.ml.clustering.KMeansPlusPlusClusterer;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
public class KmeansEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
|
||||
public class KmeansEvaluator extends RecursiveObjectEvaluator implements TwoValueWorker {
|
||||
protected static final long serialVersionUID = 1L;
|
||||
|
||||
|
||||
private int maxIterations = 1000;
|
||||
|
||||
public KmeansEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
|
||||
super(expression, factory);
|
||||
|
||||
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
|
||||
|
||||
for(StreamExpressionNamedParameter namedParam : namedParams){
|
||||
if(namedParam.getName().equals("maxIterations")) {
|
||||
this.maxIterations = Integer.parseInt(namedParam.getParameter().toString().trim());
|
||||
} else {
|
||||
throw new IOException("Unexpected named parameter:"+namedParam.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object doWork(Object... values) throws IOException {
|
||||
|
||||
if(values.length < 2) {
|
||||
throw new IOException("kmeans expects atleast two parameters a Matrix of observations and k");
|
||||
}
|
||||
public Object doWork(Object value1, Object value2) throws IOException {
|
||||
|
||||
Matrix matrix = null;
|
||||
int k = 0;
|
||||
int maxIterations = 1000;
|
||||
|
||||
if(values[0] instanceof Matrix) {
|
||||
matrix = (Matrix)values[0];
|
||||
if(value1 instanceof Matrix) {
|
||||
matrix = (Matrix)value1;
|
||||
} else {
|
||||
throw new IOException("The first parameter for kmeans should be the observation matrix.");
|
||||
}
|
||||
|
||||
if(values[1] instanceof Number) {
|
||||
k = ((Number)values[1]).intValue();
|
||||
if(value2 instanceof Number) {
|
||||
k = ((Number)value2).intValue();
|
||||
} else {
|
||||
throw new IOException("The second parameter for kmeans should be k.");
|
||||
}
|
||||
|
||||
if(values.length == 3) {
|
||||
maxIterations = ((Number)values[2]).intValue();
|
||||
}
|
||||
|
||||
KMeansPlusPlusClusterer<ClusterPoint> kmeans = new KMeansPlusPlusClusterer(k, maxIterations);
|
||||
List<ClusterPoint> points = new ArrayList();
|
||||
|
@ -110,6 +113,7 @@ public class KmeansEvaluator extends RecursiveObjectEvaluator implements ManyVal
|
|||
|
||||
private List<String> columnLabels;
|
||||
private List<CentroidCluster<ClusterPoint>> clusters;
|
||||
private Matrix membershipMatrix;
|
||||
|
||||
public ClusterTuple(Map fields,
|
||||
List<CentroidCluster<ClusterPoint>> clusters,
|
||||
|
@ -119,6 +123,20 @@ public class KmeansEvaluator extends RecursiveObjectEvaluator implements ManyVal
|
|||
this.columnLabels = columnLabels;
|
||||
}
|
||||
|
||||
public ClusterTuple(Map fields,
|
||||
List<CentroidCluster<ClusterPoint>> clusters,
|
||||
List<String> columnLabels,
|
||||
Matrix membershipMatrix) {
|
||||
super(fields);
|
||||
this.clusters = clusters;
|
||||
this.columnLabels = columnLabels;
|
||||
this.membershipMatrix = membershipMatrix;
|
||||
}
|
||||
|
||||
public Matrix getMembershipMatrix() {
|
||||
return this.membershipMatrix;
|
||||
}
|
||||
|
||||
public List<String> getColumnLabels() {
|
||||
return this.columnLabels;
|
||||
}
|
||||
|
@ -126,10 +144,6 @@ public class KmeansEvaluator extends RecursiveObjectEvaluator implements ManyVal
|
|||
public List<CentroidCluster<ClusterPoint>> getClusters() {
|
||||
return this.clusters;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.io.eval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.math3.ml.clustering.KMeansPlusPlusClusterer;
|
||||
import org.apache.commons.math3.ml.clustering.MultiKMeansPlusPlusClusterer;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
public class MultiKmeansEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
|
||||
protected static final long serialVersionUID = 1L;
|
||||
|
||||
private int maxIterations = 1000;
|
||||
|
||||
public MultiKmeansEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
|
||||
super(expression, factory);
|
||||
|
||||
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
|
||||
|
||||
for(StreamExpressionNamedParameter namedParam : namedParams){
|
||||
if(namedParam.getName().equals("maxIterations")) {
|
||||
this.maxIterations = Integer.parseInt(namedParam.getParameter().toString().trim());
|
||||
} else {
|
||||
throw new IOException("Unexpected named parameter:"+namedParam.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object doWork(Object... values) throws IOException {
|
||||
|
||||
if(values.length != 3) {
|
||||
throw new IOException("The multiKmeans function expects three parameters; a matrix to cluster, k and number of trials.");
|
||||
}
|
||||
|
||||
Object value1 = values[0];
|
||||
Object value2 = values[1];
|
||||
Object value3 = values[2];
|
||||
|
||||
Matrix matrix = null;
|
||||
int k = 0;
|
||||
int trials=0;
|
||||
|
||||
if(value1 instanceof Matrix) {
|
||||
matrix = (Matrix)value1;
|
||||
} else {
|
||||
throw new IOException("The first parameter for multiKmeans should be the observation matrix.");
|
||||
}
|
||||
|
||||
if(value2 instanceof Number) {
|
||||
k = ((Number)value2).intValue();
|
||||
} else {
|
||||
throw new IOException("The second parameter for multiKmeans should be k.");
|
||||
}
|
||||
|
||||
if(value3 instanceof Number) {
|
||||
trials= ((Number)value3).intValue();
|
||||
} else {
|
||||
throw new IOException("The third parameter for multiKmeans should be trials.");
|
||||
}
|
||||
|
||||
KMeansPlusPlusClusterer<KmeansEvaluator.ClusterPoint> kmeans = new KMeansPlusPlusClusterer(k, maxIterations);
|
||||
MultiKMeansPlusPlusClusterer multiKmeans = new MultiKMeansPlusPlusClusterer(kmeans, trials);
|
||||
|
||||
List<KmeansEvaluator.ClusterPoint> points = new ArrayList();
|
||||
double[][] data = matrix.getData();
|
||||
|
||||
List<String> ids = matrix.getRowLabels();
|
||||
|
||||
for(int i=0; i<data.length; i++) {
|
||||
double[] vec = data[i];
|
||||
points.add(new KmeansEvaluator.ClusterPoint(ids.get(i), vec));
|
||||
}
|
||||
|
||||
Map fields = new HashMap();
|
||||
|
||||
fields.put("k", k);
|
||||
fields.put("trials", trials);
|
||||
fields.put("distance", "euclidean");
|
||||
fields.put("maxIterations", maxIterations);
|
||||
|
||||
return new KmeansEvaluator.ClusterTuple(fields, multiKmeans.cluster(points), matrix.getColumnLabels());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -68,6 +68,7 @@ public interface AutoScalingParams {
|
|||
String TRIGGER_SCHEDULE_DELAY_SECONDS = "triggerScheduleDelaySeconds";
|
||||
String TRIGGER_COOLDOWN_PERIOD_SECONDS = "triggerCooldownPeriodSeconds";
|
||||
String TRIGGER_CORE_POOL_SIZE = "triggerCorePoolSize";
|
||||
String MAX_COMPUTE_OPERATIONS = "maxComputeOperations";
|
||||
|
||||
@Deprecated
|
||||
String ACTION_THROTTLE_PERIOD_SECONDS = "actionThrottlePeriodSeconds";
|
||||
|
|
|
@ -7076,6 +7076,192 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiKmeans() throws Exception {
|
||||
String cexpr = "let(echo=true," +
|
||||
" a=array(1,1,1,0,0,0)," +
|
||||
" b=array(1,1,1,0,0,0)," +
|
||||
" c=array(0,0,0,1,1,1)," +
|
||||
" d=array(0,0,0,1,1,1)," +
|
||||
" e=setRowLabels(matrix(a,b,c,d), " +
|
||||
" array(doc1, doc2, doc3, doc4))," +
|
||||
" f=multiKmeans(e, 2, 5)," +
|
||||
" g=getCluster(f, 0)," +
|
||||
" h=getCluster(f, 1)," +
|
||||
" i=getCentroids(f)," +
|
||||
" j=getRowLabels(g)," +
|
||||
" k=getRowLabels(h))";
|
||||
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
|
||||
paramsLoc.set("expr", cexpr);
|
||||
paramsLoc.set("qt", "/stream");
|
||||
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
|
||||
TupleStream solrStream = new SolrStream(url, paramsLoc);
|
||||
StreamContext context = new StreamContext();
|
||||
solrStream.setStreamContext(context);
|
||||
List<Tuple> tuples = getTuples(solrStream);
|
||||
assertTrue(tuples.size() == 1);
|
||||
List<List<Number>> cluster1 = (List<List<Number>>)tuples.get(0).get("g");
|
||||
List<List<Number>> cluster2 = (List<List<Number>>)tuples.get(0).get("h");
|
||||
List<List<Number>> centroids = (List<List<Number>>)tuples.get(0).get("i");
|
||||
List<String> labels1 = (List<String>)tuples.get(0).get("j");
|
||||
List<String> labels2 = (List<String>)tuples.get(0).get("k");
|
||||
|
||||
assertEquals(cluster1.size(), 2);
|
||||
assertEquals(cluster2.size(), 2);
|
||||
assertEquals(centroids.size(), 2);
|
||||
|
||||
//Assert that the docs are not in both clusters
|
||||
assertTrue(!(labels1.contains("doc1") && labels2.contains("doc1")));
|
||||
assertTrue(!(labels1.contains("doc2") && labels2.contains("doc2")));
|
||||
assertTrue(!(labels1.contains("doc3") && labels2.contains("doc3")));
|
||||
assertTrue(!(labels1.contains("doc4") && labels2.contains("doc4")));
|
||||
|
||||
//Assert that (doc1 and doc2) or (doc3 and doc4) are in labels1
|
||||
assertTrue((labels1.contains("doc1") && labels1.contains("doc2")) ||
|
||||
((labels1.contains("doc3") && labels1.contains("doc4"))));
|
||||
|
||||
//Assert that (doc1 and doc2) or (doc3 and doc4) are in labels2
|
||||
assertTrue((labels2.contains("doc1") && labels2.contains("doc2")) ||
|
||||
((labels2.contains("doc3") && labels2.contains("doc4"))));
|
||||
|
||||
if(labels1.contains("doc1")) {
|
||||
assertEquals(centroids.get(0).get(0).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(1).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(2).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(3).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(4).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(5).doubleValue(), 0.0, 0.0);
|
||||
|
||||
assertEquals(centroids.get(1).get(0).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(1).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(2).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(3).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(4).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(5).doubleValue(), 1.0, 0.0);
|
||||
} else {
|
||||
assertEquals(centroids.get(0).get(0).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(1).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(2).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(3).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(4).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(0).get(5).doubleValue(), 1.0, 0.0);
|
||||
|
||||
assertEquals(centroids.get(1).get(0).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(1).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(2).doubleValue(), 1.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(3).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(4).doubleValue(), 0.0, 0.0);
|
||||
assertEquals(centroids.get(1).get(5).doubleValue(), 0.0, 0.0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testFuzzyKmeans() throws Exception {
|
||||
String cexpr = "let(echo=true," +
|
||||
" a=array(1,1,1,0,0,0)," +
|
||||
" b=array(1,1,1,0,0,0)," +
|
||||
" c=array(0,0,0,1,1,1)," +
|
||||
" d=array(0,0,0,1,1,1)," +
|
||||
" e=setRowLabels(matrix(a,b,c,d), " +
|
||||
" array(doc1, doc2, doc3, doc4))," +
|
||||
" f=fuzzyKmeans(e, 2)," +
|
||||
" g=getCluster(f, 0)," +
|
||||
" h=getCluster(f, 1)," +
|
||||
" i=getCentroids(f)," +
|
||||
" j=getRowLabels(g)," +
|
||||
" k=getRowLabels(h)," +
|
||||
" l=getMembershipMatrix(f))";
|
||||
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
|
||||
paramsLoc.set("expr", cexpr);
|
||||
paramsLoc.set("qt", "/stream");
|
||||
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
|
||||
TupleStream solrStream = new SolrStream(url, paramsLoc);
|
||||
StreamContext context = new StreamContext();
|
||||
solrStream.setStreamContext(context);
|
||||
List<Tuple> tuples = getTuples(solrStream);
|
||||
assertTrue(tuples.size() == 1);
|
||||
List<List<Number>> cluster1 = (List<List<Number>>)tuples.get(0).get("g");
|
||||
List<List<Number>> cluster2 = (List<List<Number>>)tuples.get(0).get("h");
|
||||
List<List<Number>> centroids = (List<List<Number>>)tuples.get(0).get("i");
|
||||
List<List<Number>> membership = (List<List<Number>>)tuples.get(0).get("l");
|
||||
|
||||
List<String> labels1 = (List<String>)tuples.get(0).get("j");
|
||||
List<String> labels2 = (List<String>)tuples.get(0).get("k");
|
||||
|
||||
assertEquals(cluster1.size(), 2);
|
||||
assertEquals(cluster2.size(), 2);
|
||||
assertEquals(centroids.size(), 2);
|
||||
|
||||
//Assert that the docs are not in both clusters
|
||||
assertTrue(!(labels1.contains("doc1") && labels2.contains("doc1")));
|
||||
assertTrue(!(labels1.contains("doc2") && labels2.contains("doc2")));
|
||||
assertTrue(!(labels1.contains("doc3") && labels2.contains("doc3")));
|
||||
assertTrue(!(labels1.contains("doc4") && labels2.contains("doc4")));
|
||||
|
||||
//Assert that (doc1 and doc2) or (doc3 and doc4) are in labels1
|
||||
assertTrue((labels1.contains("doc1") && labels1.contains("doc2")) ||
|
||||
((labels1.contains("doc3") && labels1.contains("doc4"))));
|
||||
|
||||
//Assert that (doc1 and doc2) or (doc3 and doc4) are in labels2
|
||||
assertTrue((labels2.contains("doc1") && labels2.contains("doc2")) ||
|
||||
((labels2.contains("doc3") && labels2.contains("doc4"))));
|
||||
|
||||
|
||||
if(labels1.contains("doc1")) {
|
||||
assertEquals(centroids.get(0).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(1).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(2).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(3).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(4).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(5).doubleValue(), 0.0, 0.001);
|
||||
|
||||
assertEquals(centroids.get(1).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(1).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(2).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(3).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(4).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(5).doubleValue(), 1.0, 0.001);
|
||||
|
||||
//Assert the membership matrix
|
||||
assertEquals(membership.get(0).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(0).get(1).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(1).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(1).get(1).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(2).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(2).get(1).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(3).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(3).get(1).doubleValue(), 1.0, 0.001);
|
||||
|
||||
} else {
|
||||
assertEquals(centroids.get(0).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(1).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(2).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(3).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(4).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(0).get(5).doubleValue(), 1.0, 0.001);
|
||||
|
||||
assertEquals(centroids.get(1).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(1).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(2).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(3).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(4).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(centroids.get(1).get(5).doubleValue(), 0.0, 0.001);
|
||||
|
||||
//Assert the membership matrix
|
||||
assertEquals(membership.get(0).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(0).get(1).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(1).get(0).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(1).get(1).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(2).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(2).get(1).doubleValue(), 0.0, 0.001);
|
||||
assertEquals(membership.get(3).get(0).doubleValue(), 1.0, 0.001);
|
||||
assertEquals(membership.get(3).get(1).doubleValue(), 0.0, 0.001);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEBEMultiply() throws Exception {
|
||||
String cexpr = "ebeMultiply(array(2,4,6,8,10,12),array(1,2,3,4,5,6))";
|
||||
|
|
|
@ -1,3 +1,21 @@
|
|||
/*
|
||||
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
#cluster-suggestions
|
||||
.s-container{
|
||||
text-align:center;
|
||||
|
|
|
@ -1,3 +1,19 @@
|
|||
/*
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
solrAdminApp.controller('ClusterSuggestionsController',
|
||||
function($scope,$http) {
|
||||
$scope.data={}
|
||||
|
@ -26,10 +42,10 @@ function($scope,$http) {
|
|||
x.loading = false;
|
||||
x.done = true;
|
||||
x.run=true;
|
||||
$scope.msg = "Command Submitted Successfully!";
|
||||
$scope.msg = "Post Data Submitted Successfully!";
|
||||
}, function (response) {
|
||||
x.failed=true;
|
||||
$scope.msg = "Service does not exist";
|
||||
$scope.msg = "Service not Exists";
|
||||
$scope.statusval = response.status;
|
||||
$scope.statustext = response.statusText;
|
||||
$scope.headers = response.headers();
|
||||
|
|
Loading…
Reference in New Issue