SOLR-9735: Initial port of autoscaling work for Solr 7

This commit is contained in:
Shalin Shekhar Mangar 2017-05-30 14:59:25 +05:30
parent cb97ad787a
commit e5d8ed397a
44 changed files with 3454 additions and 98 deletions

View File

@ -64,7 +64,7 @@ Upgrading from Solr 6.x
registries as hierarchical MBeans. This behavior can be also disabled by specifying a SolrJmxReporter
configuration with a boolean init arg "enabled" set to "false". For a more fine-grained control users
should explicitly specify at least one SolrJmxReporter configuration.
* The sow (split-on-whitespace) request param now defaults to false (true in previous versions).
This affects the edismax and standard/"lucene" query parsers: if the sow param is not specified,
query text will not be split on whitespace before analysis. See
@ -102,19 +102,25 @@ New Features
* SOLR-10431: Make it possible to invoke v2 api calls using SolrJ (Cao Manh Dat, Noble Paul, shalin)
* SOLR-10233: Add support for different replica types, that can handle updates differently:
- NRT: Writes updates to transaction log and indexes locally. Replicas of type “NRT” support NRT
(soft commits) and RTG. Any NRT replica can become a leader. This was the only type supported
- NRT: Writes updates to transaction log and indexes locally. Replicas of type “NRT” support NRT
(soft commits) and RTG. Any NRT replica can become a leader. This was the only type supported
in SolrCloud until now and its the default type.
- TLOG: Writes to transaction log, but not to index, uses replication to copy segment files from the
shard leader. Any TLOG replica can become leader (by first applying all local transaction log
- TLOG: Writes to transaction log, but not to index, uses replication to copy segment files from the
shard leader. Any TLOG replica can become leader (by first applying all local transaction log
elements). If a replica is of type TLOG but is also the leader, it will behave as a NRT. This
is exactly what was added in SOLR-9835 (non-realtime replicas), just the API and naming changes.
- PULL: Doesnt index or writes to transaction log, just replicates from the shard leader. PULL replicas
cant become shard leaders (i.e., if there are only PULL replicas in the collection at some point,
updates will fail same as if there is no leaders, queries continue to work), so they dont even
- PULL: Doesnt index or writes to transaction log, just replicates from the shard leader. PULL replicas
cant become shard leaders (i.e., if there are only PULL replicas in the collection at some point,
updates will fail same as if there is no leaders, queries continue to work), so they dont even
participate in elections.
(Tomás Fernández Löbbe)
* SOLR-10373: Implement read API for autoscaling configuration at /admin/autoscaling or
/cluster/autoscaling paths. (shalin)
* SOLR-10677: Expose a diagnostics API to return nodes sorted by load in descending order and
any policy violations. (shalin)
Bug Fixes
----------------------
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
@ -198,6 +204,8 @@ Other Changes
* SOLR-10755: delete/refactor many solrj deprecations (hossman)
* SOLR-10764: AutoScalingHandler should validate policy and preferences before updating zookeeper. (shalin)
================== 6.7.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
@ -230,7 +238,7 @@ New Features
* SOLR-10721: Provide a way to know when Core Discovery is finished and when all async cores are done loading
(Erick Erickson)
* SOLR-10379: Add ManagedSynonymGraphFilterFactory, deprecate ManagedSynonymFilterFactory. (Steve Rowe)
* SOLR-10479: Adds support for HttpShardHandlerFactory.loadBalancerRequests(MinimumAbsolute|MaximumFraction)
@ -257,8 +265,8 @@ Other Changes
* SOLR-10400: Replace (instanceof TrieFooField || instanceof FooPointField) constructs with
FieldType.getNumberType() or SchemaField.getSortField() where appropriate. (hossman, Steve Rowe)
* SOLR-10438: Assign explicit useDocValuesAsStored values to all points field types in
* SOLR-10438: Assign explicit useDocValuesAsStored values to all points field types in
schema-point.xml/TestPointFields. (hossman, Steve Rowe)
* LUCENE-7705: Allow CharTokenizer-derived tokenizers and KeywordTokenizer to configure the max token length.
@ -335,7 +343,7 @@ New Features
* SOLR-10507: Core Admin status command to emit collection details of each core (noble)
* SOLR-10521: introducing sort=childfield(field) asc for searching by {!parent} (Mikhail Khludnev)
* SOLR-10521: introducing sort=childfield(field) asc for searching by {!parent} (Mikhail Khludnev)
* SOLR-9596: Add Solr support for SimpleTextCodec, via <codecFactory class="solr.SimpleTextCodecFactory"/>
in solrconfig.xml (per-field specification in the schema is not possible). (Steve Rowe)
@ -394,7 +402,7 @@ Optimizations
* SOLR-10499: facet.heatmap is now significantly faster when the docset (base query) matches everything and there are no
deleted docs. It's also faster when the docset matches a small fraction of the index or none. (David Smiley)
* SOLR-9217: Reduced heap consumption for filter({!join ... score=...})
* SOLR-9217: Reduced heap consumption for filter({!join ... score=...})
(Andrey Kudryavtsev, Gopikannan Venugopalsamy via Mikhail Khludnev)
* SOLR-10548: JSON Facet API now uses hyper-log-log++ for determining the number of buckets
@ -423,8 +431,8 @@ Bug Fixes
* SOLR-10264: Fixes multi-term synonym parsing in ManagedSynonymFilterFactory.
(Jörg Rathlev, Steve Rowe, Christine Poerschke)
* SOLR-8807: fix Spellcheck "collateMaxCollectDocs" parameter to work with queries that have the
* SOLR-8807: fix Spellcheck "collateMaxCollectDocs" parameter to work with queries that have the
CollpasingQParserPlugin applied. (James Dyer)
* SOLR-10474: TestPointFields.testPointFieldReturn() depends on order of unsorted hits. (Steve Rowe)
@ -434,7 +442,7 @@ Bug Fixes
* SOLR-10047: Mismatched Docvalues segments cause exception in Sorting/Faceting. Solr now uninverts per segment
to avoid such exceptions. (Keith Laban via shalin)
* SOLR-10472: Fixed uninversion (aka: FieldCache) bugs with the numeric PointField classes, and CurrencyField (hossman)
* SOLR-5127: Multiple highlight fields and wildcards are now supported e.g. hl.fl=title,text_*
@ -446,13 +454,13 @@ Bug Fixes
when there was a mincount > 1. This has been corrected by changing numBuckets cardinality processing to
ignore mincount > 1 for non-distributed requests. (yonik)
* SOLR-10520: child.facet.field doubled counts at least when rows>0. (Dr. Oleg Savrasov via Mikhail Khludnev)
* SOLR-10520: child.facet.field doubled counts at least when rows>0. (Dr. Oleg Savrasov via Mikhail Khludnev)
* SOLR-10480: Full pagination in JSON Facet API using offset does not work. (yonik)
* SOLR-10526: facet.heatmap didn't honor facet exclusions ('ex') for distributed search. (David Smiley)
* SOLR-10500: nested child docs are adopted by neighbour when several parents come in update/json/docs
* SOLR-10500: nested child docs are adopted by neighbour when several parents come in update/json/docs
(Alexey Suprun,noble via Mikhail Khludnev)
* SOLR-10316: Unloading a core can remove a ZK SolrCore registration entry for the wrong SolrCore. (Mark Miller)
@ -478,7 +486,7 @@ Bug Fixes
and accept "TO" as endpoints in range queries. (hossman, Steve Rowe)
* SOLR-10735: Windows script (solr.cmd) didn't work properly with directory containing spaces. Adding quotations
to fix (Uwe Schindler, janhoy, Tomas Fernandez-Lobbe, Ishan Chattopadhyaya)
to fix (Uwe Schindler, janhoy, Tomas Fernandez-Lobbe, Ishan Chattopadhyaya)
Ref Guide
----------------------
@ -578,7 +586,7 @@ Bug Fixes
* SOLR-10404: The fetch() streaming expression wouldn't work if a value included query syntax chars (like :+-).
Fixed, and enhanced the generated query to not pollute the queryCache. (David Smiley)
* SOLR-10423: Disable graph query production via schema configuration <fieldtype ... enableGraphQueries="false">.
This fixes broken queries for ShingleFilter-containing query-time analyzers when request param sow=false.
(Steve Rowe)

View File

@ -668,6 +668,7 @@ public class ZkController {
byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.api.Api;
import org.apache.solr.api.ApiBag;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommandOperation;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CommonParams.JSON;
/**
* Handler for /cluster/autoscaling
*/
public class AutoScalingHandler extends RequestHandlerBase implements PermissionNameProvider {
public static final String HANDLER_PATH = "/admin/autoscaling";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static ImmutableSet<String> singletonCommands = ImmutableSet.of("set-cluster-preferences", "set-cluster-policy");
protected final CoreContainer container;
private final List<Map<String, String>> DEFAULT_ACTIONS = new ArrayList<>(3);
public AutoScalingHandler(CoreContainer container) {
this.container = container;
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
try {
String httpMethod = (String) req.getContext().get("httpMethod");
RequestHandlerUtils.setWt(req, JSON);
if ("GET".equals(httpMethod)) {
String path = (String) req.getContext().get("path");
if (path == null) path = "/cluster/autoscaling";
List<String> parts = StrUtils.splitSmart(path, '/');
if (parts.get(0).isEmpty()) parts.remove(0);
if (parts.size() < 2 || parts.size() > 3) {
// invalid
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown path: " + path);
}
Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
if (parts.size() == 2) {
rsp.getValues().addAll(map);
} else if (parts.size() == 3 && "diagnostics".equals(parts.get(2))) {
handleDiagnostics(rsp, map);
}
} else {
if (req.getContentStreams() == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No commands specified for autoscaling");
}
List<CommandOperation> ops = CommandOperation.readCommands(req.getContentStreams(), rsp.getValues(), singletonCommands);
if (ops == null) {
// errors have already been added to the response so there's nothing left to do
return;
}
for (CommandOperation op : ops) {
switch (op.name) {
case "set-policy":
handleSetPolicies(req, rsp, op);
break;
case "remove-policy":
handleRemovePolicy(req, rsp, op);
break;
case "set-cluster-preferences":
handleSetClusterPreferences(req, rsp, op);
break;
case "set-cluster-policy":
handleSetClusterPolicy(req, rsp, op);
break;
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + op.name);
}
}
}
} catch (Exception e) {
rsp.getValues().add("result", "failure");
throw e;
} finally {
RequestHandlerUtils.addExperimentalFormatWarning(rsp);
}
}
private void handleDiagnostics(SolrQueryResponse rsp, Map<String, Object> autoScalingConf) throws IOException {
Policy policy = new Policy(autoScalingConf);
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
Policy.Session session = policy.createSession(new SolrClientDataProvider(build));
List<Row> sorted = session.getSorted();
List<Clause.Violation> violations = session.getViolations();
List<Preference> clusterPreferences = policy.getClusterPreferences();
List<Map<String, Object>> sortedNodes = new ArrayList<>(sorted.size());
for (Row row : sorted) {
Map<String, Object> map = Utils.makeMap("node", row.node);
for (Cell cell : row.cells) {
for (Preference clusterPreference : clusterPreferences) {
Policy.SortParam name = clusterPreference.name;
if (cell.name.equalsIgnoreCase(name.name())) {
map.put(name.name(), cell.val);
break;
}
}
}
sortedNodes.add(map);
}
Map<String, Object> map = new HashMap<>(2);
map.put("sortedNodes", sortedNodes);
map.put("violations", violations);
rsp.getValues().add("diagnostics", map);
}
}
private void handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
List clusterPolicy = (List) op.getCommandData();
if (clusterPolicy == null || !(clusterPolicy instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster policies was not found");
}
zkSetClusterPolicy(container.getZkController().getZkStateReader(), clusterPolicy);
rsp.getValues().add("result", "success");
}
private void handleSetClusterPreferences(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
List preferences = (List) op.getCommandData();
if (preferences == null || !(preferences instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster preferences not found");
}
zkSetPreferences(container.getZkController().getZkStateReader(), preferences);
rsp.getValues().add("result", "success");
}
private void handleRemovePolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
String policyName = (String) op.getCommandData();
if (policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be empty");
}
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> policies = (Map<String, Object>) autoScalingConf.get("policies");
if (policies == null || !policies.containsKey(policyName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No policy exists with name: " + policyName);
}
zkSetPolicies(container.getZkController().getZkStateReader(), policyName, null);
rsp.getValues().add("result", "success");
}
private void handleSetPolicies(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
Map<String, Object> policies = op.getDataMap();
for (Map.Entry<String, Object> policy : policies.entrySet()) {
String policyName = policy.getKey();
if (policyName == null || policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be null or empty");
}
}
zkSetPolicies(container.getZkController().getZkStateReader(), null, policies);
rsp.getValues().add("result", "success");
}
private void zkSetPolicies(ZkStateReader reader, String policyBeRemoved, Map<String, Object> newPolicies) throws KeeperException, InterruptedException, IOException {
while (true) {
Stat stat = new Stat();
ZkNodeProps loaded = null;
byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
loaded = ZkNodeProps.load(data);
Map<String, Object> policies = (Map<String, Object>) loaded.get("policies");
if (policies == null) policies = new HashMap<>(1);
if (newPolicies != null) {
policies.putAll(newPolicies);
} else {
policies.remove(policyBeRemoved);
}
loaded = loaded.plus("policies", policies);
verifyAutoScalingConf(loaded.getProperties());
try {
reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
} catch (KeeperException.BadVersionException bve) {
// somebody else has changed the configuration so we must retry
continue;
}
break;
}
}
private void zkSetPreferences(ZkStateReader reader, List preferences) throws KeeperException, InterruptedException, IOException {
while (true) {
Stat stat = new Stat();
ZkNodeProps loaded = null;
byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
loaded = ZkNodeProps.load(data);
loaded = loaded.plus("cluster-preferences", preferences);
verifyAutoScalingConf(loaded.getProperties());
try {
reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
} catch (KeeperException.BadVersionException bve) {
// somebody else has changed the configuration so we must retry
continue;
}
break;
}
}
private void zkSetClusterPolicy(ZkStateReader reader, List clusterPolicy) throws KeeperException, InterruptedException, IOException {
while (true) {
Stat stat = new Stat();
ZkNodeProps loaded = null;
byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
loaded = ZkNodeProps.load(data);
loaded = loaded.plus("cluster-policy", clusterPolicy);
verifyAutoScalingConf(loaded.getProperties());
try {
reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
} catch (KeeperException.BadVersionException bve) {
// somebody else has changed the configuration so we must retry
continue;
}
break;
}
}
private void verifyAutoScalingConf(Map<String, Object> autoScalingConf) throws IOException {
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
Policy policy = new Policy(autoScalingConf);
Policy.Session session = policy.createSession(new SolrClientDataProvider(build));
log.debug("Verified autoscaling configuration");
}
}
private Map<String, Object> zkReadAutoScalingConf(ZkStateReader reader) throws KeeperException, InterruptedException {
byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
return loaded.getProperties();
}
@Override
public String getDescription() {
return "A handler for autoscaling configuration";
}
@Override
public Name getPermissionName(AuthorizationContext request) {
switch (request.getHttpMethod()) {
case "GET":
return Name.AUTOSCALING_READ_PERM;
case "POST":
return Name.AUTOSCALING_WRITE_PERM;
default:
return null;
}
}
@Override
public Collection<Api> getApis() {
return ApiBag.wrapRequestHandlers(this, "autoscaling.Commands");
}
@Override
public Boolean registerV2() {
return Boolean.TRUE;
}
@Override
public SolrRequestHandler getSubHandler(String path) {
if (path.equals("/diagnostics")) return this;
return null;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package for classes related to autoscaling
*/
package org.apache.solr.cloud.autoscaling;

View File

@ -74,13 +74,12 @@ public class ServerSnitchContext extends SnitchContext {
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
if (callback == null) callback = this;
String url = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
params.add("class", klas);
params.add(ACTION, INVOKE.toString());
//todo batch all requests to the same server
try {
SimpleSolrResponse rsp = invoke(coreContainer.getUpdateShardHandler(), url, CommonParams.CORES_HANDLER_PATH, params);
SimpleSolrResponse rsp = invoke(node, CommonParams.CORES_HANDLER_PATH, params);
Map<String, Object> returnedVal = (Map<String, Object>) rsp.getResponse().get(klas);
if(exception == null){
// log this
@ -94,8 +93,10 @@ public class ServerSnitchContext extends SnitchContext {
}
}
public SimpleSolrResponse invoke(UpdateShardHandler shardHandler, final String url, String path, SolrParams params)
public SimpleSolrResponse invoke(String solrNode, String path, SolrParams params)
throws IOException, SolrServerException {
String url = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(solrNode);
UpdateShardHandler shardHandler = coreContainer.getUpdateShardHandler();
GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
try (HttpSolrClient client = new HttpSolrClient.Builder(url).withHttpClient(shardHandler.getHttpClient())
.withResponseParser(new BinaryResponseParser()).build()) {

View File

@ -60,6 +60,7 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.AuthSchemeRegistryProvider;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.CredentialsProviderProvider;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
@ -150,7 +151,7 @@ public class CoreContainer {
private UpdateShardHandler updateShardHandler;
private TransientSolrCoreCacheFactory transientCoreCache;
private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
@ -193,6 +194,8 @@ public class CoreContainer {
public final static long INITIAL_CORE_LOAD_COMPLETE = 0x4L;
private volatile long status = 0L;
protected AutoScalingHandler autoScalingHandler;
private enum CoreInitFailedAction { fromleader, none }
/**
@ -528,6 +531,9 @@ public class CoreContainer {
metricsCollectorHandler = createHandler(MetricsCollectorHandler.HANDLER_PATH, MetricsCollectorHandler.class.getName(), MetricsCollectorHandler.class);
// may want to add some configuration here in the future
metricsCollectorHandler.init(null);
autoScalingHandler = createHandler(AutoScalingHandler.HANDLER_PATH, AutoScalingHandler.class.getName(), AutoScalingHandler.class);
containerHandlers.put(AUTHZ_PATH, securityConfHandler);
securityConfHandler.initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), AUTHZ_PATH);
containerHandlers.put(AUTHC_PATH, securityConfHandler);
@ -587,7 +593,7 @@ public class CoreContainer {
}
checkForDuplicateCoreNames(cds);
status |= CORE_DISCOVERY_COMPLETE;
for (final CoreDescriptor cd : cds) {
if (cd.isTransient() || !cd.isLoadOnStartup()) {
getTransientCacheHandler().addTransientDescriptor(cd.getName(), cd);
@ -663,7 +669,7 @@ public class CoreContainer {
}
return transientCoreCache.getTransientSolrCoreCache();
}
public void securityNodeChanged() {
log.info("Security node changed, reloading security.json");
reloadSecurityProperties();
@ -832,7 +838,7 @@ public class CoreContainer {
if( core == null ) {
throw new RuntimeException( "Can not register a null core." );
}
if (isShutDown) {
core.close();
throw new IllegalStateException("This CoreContainer has been closed");
@ -1114,7 +1120,7 @@ public class CoreContainer {
/**
* get a list of all the cores that are currently loaded
* @return a list of al lthe available core names in either permanent or transient core lists.
*
*
* Note: this implies that the core is loaded
*/
public Collection<String> getAllCoreNames() {
@ -1163,12 +1169,12 @@ public class CoreContainer {
if (ret == null) {
oldDesc.loadExtraProperties(); // there may be changes to extra properties that we need to pick up.
return oldDesc;
}
// The CloudDescriptor bit here is created in a very convoluted way, requiring access to private methods
// in ZkController. When reloading, this behavior is identical to what used to happen where a copy of the old
// CoreDescriptor was just re-used.
if (ret.getCloudDescriptor() != null) {
ret.getCloudDescriptor().reload(oldDesc.getCloudDescriptor());
}
@ -1186,7 +1192,7 @@ public class CoreContainer {
public void reload(String name) {
SolrCore core = solrCores.getCoreFromAnyList(name, false);
if (core != null) {
// The underlying core properties files may have changed, we don't really know. So we have a (perhaps) stale
// CoreDescriptor and we need to reload it from the disk files
CoreDescriptor cd = reloadCoreDescriptor(core.getCoreDescriptor());
@ -1206,7 +1212,7 @@ public class CoreContainer {
if (!cd.getCloudDescriptor().isLeader()) {
getZkController().startReplicationFromLeader(newCore.getName(), true);
}
}
}
} catch (SolrCoreState.CoreIsClosedException e) {
@ -1293,7 +1299,7 @@ public class CoreContainer {
// cancel recovery in cloud mode
core.getSolrCoreState().cancelRecovery();
if (core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.PULL
|| core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
|| core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
// Stop replication if this is part of a pull/tlog replica before closing the core
zkSys.getZkController().stopReplicationFromLeader(name);
}
@ -1385,10 +1391,10 @@ public class CoreContainer {
// This is a bit of awkwardness where SolrCloud and transient cores don't play nice together. For transient cores,
// we have to allow them to be created at any time there hasn't been a core load failure (use reload to cure that).
// But for TestConfigSetsAPI.testUploadWithScriptUpdateProcessor, this needs to _not_ try to load the core if
// the core is null and there was an error. If you change this, be sure to run both TestConfiSetsAPI and
// the core is null and there was an error. If you change this, be sure to run both TestConfiSetsAPI and
// TestLazyCores
if (desc == null || zkSys.getZkController() != null) return null;
// This will put an entry in pending core ops if the core isn't loaded
core = solrCores.waitAddPendingCoreOps(name);

View File

@ -90,7 +90,8 @@ public class SolrResourceLoader implements ResourceLoader,Closeable
static final String[] packages = {
"", "analysis.", "schema.", "handler.", "search.", "update.", "core.", "response.", "request.",
"update.processor.", "util.", "spelling.", "handler.component.", "handler.dataimport.",
"spelling.suggest.", "spelling.suggest.fst.", "rest.schema.analysis.", "security.","handler.admin."
"spelling.suggest.", "spelling.suggest.fst.", "rest.schema.analysis.", "security.","handler.admin.",
"cloud.autoscaling."
};
private static final java.lang.String SOLR_CORE_NAME = "solr.core.name";
private static Set<String> loggedOnce = new ConcurrentSkipListSet<>();

View File

@ -80,7 +80,7 @@ public class BlobHandler extends RequestHandlerBase implements PluginInfoInitial
public void handleRequestBody(final SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
String httpMethod = req.getHttpMethod();
String path = (String) req.getContext().get("path");
SolrConfigHandler.setWt(req, JSON);
RequestHandlerUtils.setWt(req, JSON);
List<String> pieces = StrUtils.splitSmart(path, '/');
String blobName = null;

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.*;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
@ -119,4 +120,16 @@ public class RequestHandlerUtils
}
return false;
}
/**
* @since 6.7
*/
public static void setWt(SolrQueryRequest req, String wt) {
SolrParams params = req.getParams();
if (params.get(CommonParams.WT) != null) return;//wt is set by user
Map<String, String> map = new HashMap<>(1);
map.put(CommonParams.WT, wt);
map.put("indent", "true");
req.setParams(SolrParams.wrapDefaults(params, new MapSolrParams(map)));
}
}

View File

@ -76,7 +76,7 @@ public class SchemaHandler extends RequestHandlerBase implements SolrCoreAware,
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrConfigHandler.setWt(req, JSON);
RequestHandlerUtils.setWt(req, JSON);
String httpMethod = (String) req.getContext().get("httpMethod");
if ("POST".equals(httpMethod)) {
if (isImmutableConfigSet) {

View File

@ -124,7 +124,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
setWt(req, CommonParams.JSON);
RequestHandlerUtils.setWt(req, CommonParams.JSON);
String httpMethod = (String) req.getContext().get("httpMethod");
Command command = new Command(req, rsp, httpMethod);
if ("POST".equals(httpMethod)) {
@ -673,15 +673,6 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
return null;
}
public static void setWt(SolrQueryRequest req, String wt) {
SolrParams params = req.getParams();
if (params.get(CommonParams.WT) != null) return;//wt is set by user
Map<String, String> map = new HashMap<>(1);
map.put(CommonParams.WT, wt);
map.put("indent", "true");
req.setParams(SolrParams.wrapDefaults(params, new MapSolrParams(map)));
}
@Override
public SolrRequestHandler getSubHandler(String path) {
if (subPaths.contains(path)) return this;

View File

@ -34,7 +34,7 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.SolrConfigHandler;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthenticationPlugin;
@ -74,7 +74,7 @@ public abstract class SecurityConfHandler extends RequestHandlerBase implements
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrConfigHandler.setWt(req, CommonParams.JSON);
RequestHandlerUtils.setWt(req, CommonParams.JSON);
String httpMethod = (String) req.getContext().get("httpMethod");
String path = (String) req.getContext().get("path");
String key = path.substring(path.lastIndexOf('/')+1);

View File

@ -26,6 +26,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
@ -144,6 +147,8 @@ public abstract class TextResponseWriter implements PushWriter {
writeNumber(name, (Number) val);
} else if (val instanceof Boolean) {
writeBool(name, (Boolean) val);
} else if (val instanceof AtomicBoolean) {
writeBool(name, ((AtomicBoolean) val).get());
} else if (val instanceof Date) {
writeDate(name, (Date) val);
} else if (val instanceof Document) {
@ -221,13 +226,17 @@ public abstract class TextResponseWriter implements PushWriter {
} else if (val instanceof Float) {
// we pass the float instead of using toString() because
// it may need special formatting. same for double.
writeFloat(name, ((Float)val).floatValue());
writeFloat(name, val.floatValue());
} else if (val instanceof Double) {
writeDouble(name, ((Double) val).doubleValue());
writeDouble(name, val.doubleValue());
} else if (val instanceof Short) {
writeInt(name, val.toString());
} else if (val instanceof Byte) {
writeInt(name, val.toString());
} else if (val instanceof AtomicInteger) {
writeInt(name, ((AtomicInteger) val).get());
} else if (val instanceof AtomicLong) {
writeLong(name, ((AtomicLong) val).get());
} else {
// default... for debugging only
writeStr(name, val.getClass().getName() + ':' + val.toString(), true);

View File

@ -47,6 +47,8 @@ public interface PermissionNameProvider {
SECURITY_EDIT_PERM("security-edit", null),
SECURITY_READ_PERM("security-read", null),
METRICS_READ_PERM("metrics-read", null),
AUTOSCALING_READ_PERM("autoscaling-read", null),
AUTOSCALING_WRITE_PERM("autoscaling-write", null),
ALL("all", unmodifiableSet(new HashSet<>(asList("*", null))))
;
final String name;

View File

@ -0,0 +1,47 @@
{
"documentation": "TODO NOCOMMIT",
"description": "The Scaling API provides API for adding cluster level scaling rules, triggers and event listeners",
"methods": [
"GET",
"POST"
],
"url": {
"paths": [
"/cluster/autoscaling",
"/cluster/autoscaling/diagnostics"
]
},
"commands": {
"set-policy" : {
"type":"object",
"description": "The set-policy command allows you to add and update policies that apply to collections",
/* "patternProperties": {
"^.+$": {
"type": "array"
}
},*/
"additionalProperties": true
},
"set-cluster-policy" : {
"type" : "array",
"description" : "The set-cluster-policy command allows you to add and update cluster-level policy that acts as the base for all collection level policies, if any"
},
"set-cluster-preferences" : {
"type" : "array",
"description" : "The set-cluster-preferences command allows you to add and update cluster-level preferences that are used to sort nodes for selection in cluster operations"
},
"remove-policy": {
"description": "Remove a policy",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the policy to be removed"
}
},
"required": [
"name"
]
}
}
}

View File

@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
/**
* Test for AutoScalingHandler
*/
public class AutoScalingHandlerTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Before
public void beforeTest() throws Exception {
// clear any persisted auto scaling configuration
zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
}
@Test
public void testPolicyAndPreferences() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
// add multiple policies
String setPolicyCommand = "{'set-policy': {" +
" 'xyz':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'!overseer', 'replica':0}" +
" ]," +
" 'policy1':[" +
" {'cores':'<2', 'node':'#ANY'}," +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPolicyCommand);
NamedList<Object> response = null;
try {
response = solrClient.request(req);
fail("Adding a policy with 'cores' attribute should not have succeeded.");
} catch (HttpSolrClient.RemoteSolrException e) {
// expected
assertTrue(e.getMessage().contains("cores is only allowed in 'cluster-policy'"));
} catch (Exception e) {
throw e;
}
setPolicyCommand = "{'set-policy': {" +
" 'xyz':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'!overseer', 'replica':0}" +
" ]," +
" 'policy1':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]" +
"}}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
Map<String, Object> policies = (Map<String, Object>) loaded.get("policies");
assertNotNull(policies);
assertNotNull(policies.get("xyz"));
assertNotNull(policies.get("policy1"));
// update default policy
setPolicyCommand = "{'set-policy': {" +
" 'xyz':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]" +
"}}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
policies = (Map<String, Object>) loaded.get("policies");
List conditions = (List) policies.get("xyz");
assertEquals(1, conditions.size());
// remove policy
String removePolicyCommand = "{remove-policy : policy1}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, removePolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
policies = (Map<String, Object>) loaded.get("policies");
assertNull(policies.get("policy1"));
// set preferences
String setPreferencesCommand = "{" +
" 'set-cluster-preferences': [" +
" {'minimize': 'cores', 'precision': 3}," +
" {'maximize': 'freedisk','precision': 100}," +
" {'minimize': 'sysLoadAvg','precision': 10}]" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPreferencesCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
List preferences = (List) loaded.get("cluster-preferences");
assertEquals(3, preferences.size());
// set preferences
setPreferencesCommand = "{" +
" 'set-cluster-preferences': [" +
" {'minimize': 'sysLoadAvg','precision': 10}]" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPreferencesCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
preferences = (List) loaded.get("cluster-preferences");
assertEquals(1, preferences.size());
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'!overseer', 'replica':0}" +
" ]" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
List clusterPolicy = (List) loaded.get("cluster-policy");
assertNotNull(clusterPolicy);
assertEquals(3, clusterPolicy.size());
}
@Test
public void testReadApi() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'overseer', 'replica':0}" +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setPreferencesCommand = "{" +
" 'set-cluster-preferences': [" +
" {'minimize': 'cores', 'precision': 3}," +
" {'maximize': 'freedisk','precision': 100}," +
" {'minimize': 'sysLoadAvg','precision': 10}," +
" {'minimize': 'heapUsage','precision': 10}]" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPreferencesCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setPolicyCommand = "{'set-policy': {" +
" 'xyz':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'overseer', 'replica':0}" +
" ]," +
" 'policy1':[" +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]" +
"}}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setPolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// SolrQuery query = new SolrQuery().setParam(CommonParams.QT, path);
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
List<Map> clusterPrefs = (List<Map>) response.get("cluster-preferences");
assertNotNull(clusterPrefs);
assertEquals(4, clusterPrefs.size());
List<Map> clusterPolicy = (List<Map>) response.get("cluster-policy");
assertNotNull(clusterPolicy);
assertEquals(3, clusterPolicy.size());
Map policies = (Map) response.get("policies");
assertNotNull(policies);
assertEquals(2, policies.size());
assertNotNull(policies.get("xyz"));
assertNotNull(policies.get("policy1"));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, "/diagnostics", null);
response = solrClient.request(req);
Map<String, Object> diagnostics = (Map<String, Object>) response.get("diagnostics");
List sortedNodes = (List) diagnostics.get("sortedNodes");
assertNotNull(sortedNodes);
assertEquals(2, sortedNodes.size());
String[] sortedNodeNames = new String[2];
for (int i = 0; i < 2; i++) {
Map node = (Map) sortedNodes.get(i);
assertNotNull(node);
assertEquals(5, node.size());
assertNotNull(sortedNodeNames[i] = (String) node.get("node"));
assertNotNull(node.get("cores"));
assertEquals(0, node.get("cores"));
assertNotNull(node.get("freedisk"));
assertNotNull(node.get("sysLoadAvg"));
assertNotNull(node.get("heapUsage"));
}
List<Map<String, Object>> violations = (List<Map<String, Object>>) diagnostics.get("violations");
assertNotNull(violations);
assertEquals(0, violations.size());
violations = (List<Map<String, Object>>) diagnostics.get("violations");
assertNotNull(violations);
assertEquals(0, violations.size());
// lets create a collection which violates the rule replicas < 2
CollectionAdminRequest.Create create = CollectionAdminRequest.Create.createCollection("readApiTestViolations", 1, 6);
create.setMaxShardsPerNode(10);
CollectionAdminResponse adminResponse = create.process(solrClient);
assertTrue(adminResponse.isSuccess());
// get the diagnostics output again
req = createAutoScalingRequest(SolrRequest.METHOD.GET, "/diagnostics", null);
response = solrClient.request(req);
diagnostics = (Map<String, Object>) response.get("diagnostics");
sortedNodes = (List) diagnostics.get("sortedNodes");
assertNotNull(sortedNodes);
violations = (List<Map<String, Object>>) diagnostics.get("violations");
assertNotNull(violations);
assertEquals(2, violations.size());
for (Map<String, Object> violation : violations) {
assertEquals("readApiTestViolations", violation.get("collection"));
assertEquals("shard1", violation.get("shard"));
assertEquals(Utils.makeMap("replica", "3", "delta", -1), violation.get("violation"));
assertNotNull(violation.get("clause"));
}
}
static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String message) {
return createAutoScalingRequest(m, null, message);
}
static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String subPath, String message) {
boolean useV1 = random().nextBoolean();
String path = useV1 ? "/admin/autoscaling" : "/cluster/autoscaling";
path += subPath != null ? subPath : "";
return useV1
? new AutoScalingRequest(m, path, message)
: new V2Request.Builder(path).withMethod(m).withPayload(message).build();
}
static class AutoScalingRequest extends SolrRequest {
protected final String message;
public AutoScalingRequest(METHOD m, String path, String message) {
super(m, path);
this.message = message;
}
@Override
public SolrParams getParams() {
return null;
}
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
return message != null ? Collections.singletonList(new ContentStreamBase.StringStream(message)) : null;
}
@Override
protected SolrResponse createResponse(SolrClient client) {
return null;
}
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LuceneTestCase.Slow
public class TestPolicyCloud extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@org.junit.Rule
public ExpectedException expectedException = ExpectedException.none();
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(5)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@After
public void removeCollections() throws Exception {
cluster.deleteAllCollections();
}
public void testDataProvider() throws IOException, SolrServerException, KeeperException, InterruptedException {
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
.process(cluster.getSolrClient());
DocCollection rulesCollection = getCollectionState("policiesTest");
SolrClientDataProvider provider = new SolrClientDataProvider(cluster.getSolrClient());
Map<String, Object> val = provider.getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList(
"freedisk",
"cores",
"heapUsage",
"sysLoadAvg"));
assertNotNull(val.get("freedisk"));
assertNotNull(val.get("heapUsage"));
assertNotNull(val.get("sysLoadAvg"));
assertTrue(((Number) val.get("cores")).intValue() > 0);
assertTrue("freedisk value is " + ((Number) val.get("freedisk")).longValue(), ((Number) val.get("freedisk")).longValue() > 0);
assertTrue("heapUsage value is " + ((Number) val.get("heapUsage")).longValue(), ((Number) val.get("heapUsage")).longValue() > 0);
assertTrue("sysLoadAvg value is " + ((Number) val.get("sysLoadAvg")).longValue(), ((Number) val.get("sysLoadAvg")).longValue() > 0);
String overseerNode = OverseerTaskProcessor.getLeaderNode(cluster.getZkClient());
cluster.getSolrClient().request(CollectionAdminRequest.addRole(overseerNode, "overseer"));
for (int i = 0; i < 10; i++) {
Map<String, Object> data = cluster.getSolrClient().getZkStateReader().getZkClient().getJson(ZkStateReader.ROLES, true);
if (i >= 9 && data == null) {
throw new RuntimeException("NO overseer node created");
}
Thread.sleep(100);
}
val = provider.getNodeValues(overseerNode, Arrays.asList(
"nodeRole",
"ip_1", "ip_2", "ip_3", "ip_4",
"sysprop.java.version",
"sysprop.java.vendor"));
assertEquals("overseer", val.get("nodeRole"));
assertNotNull(val.get("ip_1"));
assertNotNull(val.get("ip_2"));
assertNotNull(val.get("ip_3"));
assertNotNull(val.get("ip_4"));
assertNotNull(val.get("sysprop.java.version"));
assertNotNull(val.get("sysprop.java.vendor"));
}
}

View File

@ -58,7 +58,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClusterState.CollectionRef;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
@ -83,8 +83,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
import static org.apache.solr.common.params.CommonParams.ID;
import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
/**
* SolrJ client class to communicate with SolrCloud.
@ -312,7 +312,7 @@ public class CloudSolrClient extends SolrClient {
assert seconds > 0;
this.collectionStateCache.timeToLive = seconds * 1000L;
}
public ResponseParser getParser() {
return lbClient.getParser();
}
@ -347,6 +347,7 @@ public class CloudSolrClient extends SolrClient {
public ZkStateReader getZkStateReader() {
if (stateProvider instanceof ZkClientClusterStateProvider) {
ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
stateProvider.connect();
return provider.zkStateReader;
}
throw new IllegalStateException("This has no Zk stateReader");
@ -430,7 +431,7 @@ public class CloudSolrClient extends SolrClient {
throw new IllegalArgumentException("This client does not use ZK");
}
/**
* Block until a collection state matches a predicate, or a timeout
*
@ -1210,7 +1211,7 @@ public class CloudSolrClient extends SolrClient {
&& !cacheEntry.shoulRetry()) return col;
}
ClusterState.CollectionRef ref = getCollectionRef(collection);
CollectionRef ref = getCollectionRef(collection);
if (ref == null) {
//no such collection exists
return null;
@ -1245,7 +1246,7 @@ public class CloudSolrClient extends SolrClient {
}
}
ClusterState.CollectionRef getCollectionRef(String collection) {
CollectionRef getCollectionRef(String collection) {
return stateProvider.getState(collection);
}
@ -1407,7 +1408,7 @@ public class CloudSolrClient extends SolrClient {
this.solrUrls.add(solrUrl);
return this;
}
/**
* Provide a list of Solr URL to be used when configuring {@link CloudSolrClient} instances.
* One of the provided values will be used to fetch the list of live Solr

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.autoscaling.ClusterDataProvider;
import org.apache.solr.cloud.autoscaling.Policy.ReplicaInfo;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.cloud.rule.RemoteCallback;
import org.apache.solr.common.cloud.rule.SnitchContext;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that implements {@link ClusterStateProvider} accepting a SolrClient
*/
public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
private final CloudSolrClient solrClient;
private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> data = new HashMap<>();
private Set<String> liveNodes;
private Map<String, Object> snitchSession = new HashMap<>();
private Map<String, Map> nodeVsTags = new HashMap<>();
public SolrClientDataProvider(CloudSolrClient solrClient) {
this.solrClient = solrClient;
ZkStateReader zkStateReader = solrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
this.liveNodes = clusterState.getLiveNodes();
Map<String, ClusterState.CollectionRef> all = clusterState.getCollectionStates();
all.forEach((collName, ref) -> {
DocCollection coll = ref.get();
if (coll == null) return;
coll.forEachReplica((shard, replica) -> {
Map<String, Map<String, List<ReplicaInfo>>> nodeData = data.get(replica.getNodeName());
if (nodeData == null) data.put(replica.getNodeName(), nodeData = new HashMap<>());
Map<String, List<ReplicaInfo>> collData = nodeData.get(collName);
if (collData == null) nodeData.put(collName, collData = new HashMap<>());
List<ReplicaInfo> replicas = collData.get(shard);
if (replicas == null) collData.put(shard, replicas = new ArrayList<>());
replicas.add(new ReplicaInfo(replica.getName(), collName, shard, new HashMap<>()));
});
});
}
@Override
public String getPolicyNameByCollection(String coll) {
ClusterState.CollectionRef state = solrClient.getClusterStateProvider().getState(coll);
return state == null || state.get() == null ? null : (String) state.get().getProperties().get("policy");
}
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
AutoScalingSnitch snitch = new AutoScalingSnitch();
ClientSnitchCtx ctx = new ClientSnitchCtx(null, node, snitchSession, solrClient);
snitch.getTags(node, new HashSet<>(tags), ctx);
nodeVsTags.put(node, ctx.getTags());
return ctx.getTags();
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return data.getOrDefault(node, Collections.emptyMap());//todo fill other details
}
@Override
public Collection<String> getNodes() {
return liveNodes;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("liveNodes", liveNodes);
ew.put("replicaInfo", Utils.getDeepCopy(data, 5));
ew.put("nodeValues", nodeVsTags);
}
static class ClientSnitchCtx
extends SnitchContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ZkClientClusterStateProvider zkClientClusterStateProvider;
CloudSolrClient solrClient;
public ClientSnitchCtx(SnitchInfo perSnitch,
String node, Map<String, Object> session,
CloudSolrClient solrClient) {
super(perSnitch, node, session);
this.solrClient = solrClient;
this.zkClientClusterStateProvider = (ZkClientClusterStateProvider) solrClient.getClusterStateProvider();
}
public Map getZkJson(String path) {
try {
byte[] data = zkClientClusterStateProvider.getZkStateReader().getZkClient().getData(path, null, new Stat(), true);
if (data == null) return null;
return (Map) Utils.fromJSON(data);
} catch (Exception e) {
log.warn("Unable to read from ZK path : " + path, e);
return null;
}
}
public void invokeRemote(String node, ModifiableSolrParams params, String klas, RemoteCallback callback) {
}
public SimpleSolrResponse invoke(String solrNode, String path, SolrParams params)
throws IOException, SolrServerException {
String url = zkClientClusterStateProvider.getZkStateReader().getBaseUrlForNodeName(solrNode);
GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
try (HttpSolrClient client = new HttpSolrClient.Builder()
.withHttpClient(solrClient.getHttpClient())
.withBaseSolrUrl(url)
.withResponseParser(new BinaryResponseParser())
.build()) {
NamedList<Object> rsp = client.request(request);
request.response.nl = rsp;
return request.response;
}
}
}
//uses metrics API to get node information
static class AutoScalingSnitch extends ImplicitSnitch {
@Override
protected void getRemoteInfo(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
ClientSnitchCtx snitchContext = (ClientSnitchCtx) ctx;
readSysProps(solrNode, requestedTags, snitchContext);
Set<String> groups = new HashSet<>();
List<String> prefixes = new ArrayList<>();
if (requestedTags.contains(DISK)) {
groups.add("solr.node");
prefixes.add("CONTAINER.fs.usableSpace");
}
if (requestedTags.contains(CORES)) {
groups.add("solr.core");
prefixes.add("CORE.coreName");
}
if (requestedTags.contains(SYSLOADAVG)) {
groups.add("solr.jvm");
prefixes.add("os.systemLoadAverage");
}
if (requestedTags.contains(HEAPUSAGE)) {
groups.add("solr.jvm");
prefixes.add("memory.heap.usage");
}
if (groups.isEmpty() || prefixes.isEmpty()) return;
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("group", StrUtils.join(groups, ','));
params.add("prefix", StrUtils.join(prefixes, ','));
try {
SimpleSolrResponse rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
Map m = rsp.nl.asMap(4);
if (requestedTags.contains(DISK)) {
Number n = (Number) Utils.getObjectByPath(m, true, "metrics/solr.node/CONTAINER.fs.usableSpace");
if (n != null) ctx.getTags().put(DISK, n.doubleValue() / 1024.0d / 1024.0d / 1024.0d);
}
if (requestedTags.contains(CORES)) {
int count = 0;
Map cores = (Map) m.get("metrics");
for (Object o : cores.keySet()) {
if (o.toString().startsWith("solr.core.")) count++;
}
ctx.getTags().put(CORES, count);
}
if (requestedTags.contains(SYSLOADAVG)) {
Number n = (Number) Utils.getObjectByPath(m, true, "metrics/solr.jvm/os.systemLoadAverage");
if (n != null) ctx.getTags().put(SYSLOADAVG, n.doubleValue() * 100.0d);
}
if (requestedTags.contains(HEAPUSAGE)) {
Number n = (Number) Utils.getObjectByPath(m, true, "metrics/solr.jvm/memory.heap.usage");
if (n != null) ctx.getTags().put(HEAPUSAGE, n.doubleValue() * 100.0d);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
private void readSysProps(String solrNode, Set<String> requestedTags, ClientSnitchCtx snitchContext) {
List<String> prefixes = null;
ModifiableSolrParams params;
List<String> sysProp = null;
for (String tag : requestedTags) {
if (!tag.startsWith(SYSPROP)) continue;
if (sysProp == null) {
prefixes = new ArrayList<>();
sysProp = new ArrayList<>();
prefixes.add("system.properties");
}
sysProp.add(tag.substring(SYSPROP.length()));
}
if (sysProp == null) return;
params = new ModifiableSolrParams();
params.add("prefix", StrUtils.join(prefixes, ','));
for (String s : sysProp) params.add("property", s);
try {
SimpleSolrResponse rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
Map m = rsp.nl.asMap(6);
for (String s : sysProp) {
Object v = Utils.getObjectByPath(m, true,
Arrays.asList("metrics", "solr.jvm", "system.properties", s));
if (v != null) snitchContext.getTags().put("sysprop." + s, v);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
}
}

View File

@ -39,10 +39,16 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
ZkStateReader zkStateReader;
private boolean closeZkStateReader = true;
String zkHost;
int zkConnectTimeout = 10000;
int zkClientTimeout = 10000;
public ZkClientClusterStateProvider(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
this.closeZkStateReader = false;
}
public ZkClientClusterStateProvider(Collection<String> zkHosts, String chroot) {
zkHost = buildZkHostString(zkHosts,chroot);
}
@ -55,6 +61,9 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
public ClusterState.CollectionRef getState(String collection) {
return zkStateReader.getClusterState().getCollectionRef(collection);
}
public ZkStateReader getZkStateReader(){
return zkStateReader;
}
@Override
public Set<String> liveNodes() {
@ -151,7 +160,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public void close() throws IOException {
if (zkStateReader != null) {
if (zkStateReader != null && closeZkStateReader) {
synchronized (this) {
if (zkStateReader != null)
zkStateReader.close();

View File

@ -102,7 +102,9 @@ public class V2Request extends SolrRequest {
* @return builder object
*/
public Builder withPayload(String payload) {
this.payload = new ByteArrayInputStream(payload.getBytes(StandardCharsets.UTF_8));
if (payload != null) {
this.payload = new ByteArrayInputStream(payload.getBytes(StandardCharsets.UTF_8));
}
return this;
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
class AddReplicaSuggester extends Suggester {
SolrRequest init() {
SolrRequest operation = tryEachNode(true);
if (operation == null) operation = tryEachNode(false);
return operation;
}
SolrRequest tryEachNode(boolean strict) {
String coll = (String) hints.get(Hint.COLL);
String shard = (String) hints.get(Hint.SHARD);
if (coll == null || shard == null)
throw new RuntimeException("add-replica requires 'collection' and 'shard'");
//iterate through elements and identify the least loaded
List<Clause.Violation> leastSeriousViolation = null;
Integer targetNodeIndex = null;
for (int i = getMatrix().size() - 1; i >= 0; i--) {
Row row = getMatrix().get(i);
if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
Row tmpRow = row.addReplica(coll, shard);
tmpRow.violations.clear();
List<Clause.Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i));
if(!containsNewErrors(errs)) {
if(isLessSerious(errs, leastSeriousViolation)){
leastSeriousViolation = errs;
targetNodeIndex = i;
}
}
}
if (targetNodeIndex != null) {// there are no rule violations
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(coll, shard));
return CollectionAdminRequest
.addReplicaToShard(coll, shard)
.setNode(getMatrix().get(targetNodeIndex).node);
}
return null;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.HashMap;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Utils;
class Cell implements MapWriter {
final int index;
final String name;
Object val, approxVal;
Cell(int index, String name, Object val) {
this.index = index;
this.name = name;
this.val = val;
}
Cell(int index, String name, Object val, Object approxVal) {
this.index = index;
this.name = name;
this.val = val;
this.approxVal = approxVal;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, val);
}
@Override
public String toString() {
return Utils.toJSONString(this.toMap(new HashMap<>()));
}
public Cell copy() {
return new Cell(index, name, val, approxVal);
}
}

View File

@ -0,0 +1,335 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.cloud.autoscaling.Policy.ReplicaInfo;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import static java.util.Collections.singletonMap;
import static org.apache.solr.cloud.autoscaling.Clause.TestStatus.PASS;
import static org.apache.solr.cloud.autoscaling.Operand.EQUAL;
import static org.apache.solr.cloud.autoscaling.Operand.GREATER_THAN;
import static org.apache.solr.cloud.autoscaling.Operand.LESS_THAN;
import static org.apache.solr.cloud.autoscaling.Operand.NOT_EQUAL;
import static org.apache.solr.cloud.autoscaling.Operand.WILDCARD;
import static org.apache.solr.cloud.autoscaling.Policy.ANY;
import static org.apache.solr.common.params.CoreAdminParams.COLLECTION;
import static org.apache.solr.common.params.CoreAdminParams.REPLICA;
import static org.apache.solr.common.params.CoreAdminParams.SHARD;
// a set of conditions in a policy
public class Clause implements MapWriter, Comparable<Clause> {
Map<String, Object> original;
Condition collection, shard, replica, tag, globalTag;
boolean strict = true;
Clause(Map<String, Object> m) {
this.original = m;
strict = Boolean.parseBoolean(String.valueOf(m.getOrDefault("strict", "true")));
Optional<String> globalTagName = m.keySet().stream().filter(Policy.GLOBAL_ONLY_TAGS::contains).findFirst();
if (globalTagName.isPresent()) {
globalTag = parse(globalTagName.get(), m);
if (m.size() > 2) {
throw new RuntimeException("Only one extra tag supported for the tag " + globalTagName.get() + " in " + Utils.toJSONString(m));
}
tag = parse(m.keySet().stream()
.filter(s -> (!globalTagName.get().equals(s) && !IGNORE_TAGS.contains(s)))
.findFirst().get(), m);
} else {
collection = parse(COLLECTION, m);
shard = parse(SHARD, m);
if(m.get(REPLICA) == null){
throw new RuntimeException(StrUtils.formatString("'replica' is required" + Utils.toJSONString(m)));
}
Condition replica = parse(REPLICA, m);
try {
int replicaCount = Integer.parseInt(String.valueOf(replica.val));
if(replicaCount<0){
throw new RuntimeException("replica value sould be non null "+ Utils.toJSONString(m));
}
this.replica = new Condition(replica.name, replicaCount, replica.op);
} catch (NumberFormatException e) {
throw new RuntimeException("Only an integer value is supported for replica " + Utils.toJSONString(m));
}
m.forEach((s, o) -> parseCondition(s, o));
}
if (tag == null)
throw new RuntimeException("Invalid op, must have one and only one tag other than collection, shard,replica " + Utils.toJSONString(m));
}
public boolean doesOverride(Clause that) {
return (collection.equals(that.collection) &&
tag.name.equals(that.tag.name));
}
public boolean isPerCollectiontag() {
return globalTag == null;
}
void parseCondition(String s, Object o) {
if (IGNORE_TAGS.contains(s)) return;
if (tag != null) {
throw new IllegalArgumentException("Only one tag other than collection, shard, replica is possible");
}
tag = parse(s, singletonMap(s, o));
}
@Override
public int compareTo(Clause that) {
try {
int v = Integer.compare(this.tag.op.priority, that.tag.op.priority);
if (v != 0) return v;
if (this.isPerCollectiontag() && that.isPerCollectiontag()) {
v = Integer.compare(this.replica.op.priority, that.replica.op.priority);
if (v == 0) {
v = Integer.compare((Integer) this.replica.val, (Integer) that.replica.val);
v = this.replica.op == LESS_THAN ? v : v * -1;
}
return v;
} else {
return 0;
}
} catch (NullPointerException e) {
throw e;
}
}
static class Condition {
final String name;
final Object val;
final Operand op;
Condition(String name, Object val, Operand op) {
this.name = name;
this.val = val;
this.op = op;
}
TestStatus match(Row row) {
return op.match(val, row.getVal(name));
}
TestStatus match(Object testVal) {
return op.match(this.val, testVal);
}
boolean isPass(Object inputVal) {
return op.match(val, inputVal) == PASS;
}
boolean isPass(Row row) {
return op.match(val, row.getVal(name)) == PASS;
}
@Override
public boolean equals(Object that) {
if (that instanceof Condition) {
Condition c = (Condition) that;
return Objects.equals(c.name, name) && Objects.equals(c.val, val) && c.op == op;
}
return false;
}
public Integer delta(Object val) {
return op.delta(this.val, val);
}
}
static Condition parse(String s, Map m) {
Object expectedVal = null;
Object val = m.get(s);
try {
String conditionName = s.trim();
String value = val == null ? null : String.valueOf(val).trim();
Operand operand = null;
if ((expectedVal = WILDCARD.parse(value)) != null) {
operand = WILDCARD;
} else if ((expectedVal = NOT_EQUAL.parse(value)) != null) {
operand = NOT_EQUAL;
} else if ((expectedVal = GREATER_THAN.parse(value)) != null) {
operand = GREATER_THAN;
} else if ((expectedVal = LESS_THAN.parse(value)) != null) {
operand = LESS_THAN;
} else {
operand = EQUAL;
expectedVal = EQUAL.parse(value);
}
return new Condition(conditionName, expectedVal, operand);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid tag : " + s + ":" + val, e);
}
}
public class Violation implements MapWriter {
final String shard, coll, node;
final Object actualVal;
final Integer delta;//how far is the actual value from the expected value
final Object tagKey;
private final int hash;
private Violation(String coll, String shard, String node, Object actualVal, Integer delta, Object tagKey) {
this.shard = shard;
this.coll = coll;
this.node = node;
this.delta = delta;
this.actualVal = actualVal;
this.tagKey = tagKey;
hash = ("" + coll + " " + shard + " " + node + " " + String.valueOf(tagKey) + " " + Utils.toJSONString(getClause().toMap(new HashMap<>()))).hashCode();
}
public Clause getClause() {
return Clause.this;
}
@Override
public int hashCode() {
return hash;
}
//if the delta is lower , this violation is less serious
public boolean isLessSerious(Violation that) {
return that.delta != null && delta != null &&
Math.abs(delta) < Math.abs(that.delta);
}
@Override
public boolean equals(Object that) {
if (that instanceof Violation) {
Violation v = (Violation) that;
return Objects.equals(this.shard, v.shard) &&
Objects.equals(this.coll, v.coll) &&
Objects.equals(this.node, v.node) &&
Objects.equals(this.tagKey, v.tagKey)
;
}
return false;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.putIfNotNull("collection", coll);
ew.putIfNotNull("shard", shard);
ew.putIfNotNull("node", node);
ew.putIfNotNull("tagKey", String.valueOf(tagKey));
ew.putIfNotNull("violation", (MapWriter) ew1 -> {
ew1.put(getClause().isPerCollectiontag() ? "replica" : tag.name,
String.valueOf(actualVal));
ew1.putIfNotNull("delta", delta);
});
ew.put("clause", getClause());
}
}
public List<Violation> test(List<Row> allRows) {
List<Violation> violations = new ArrayList<>();
if (isPerCollectiontag()) {
Map<String, Map<String, Map<String, AtomicInteger>>> replicaCount = computeReplicaCounts(allRows);
for (Map.Entry<String, Map<String, Map<String, AtomicInteger>>> e : replicaCount.entrySet()) {
if (!collection.isPass(e.getKey())) continue;
for (Map.Entry<String, Map<String, AtomicInteger>> shardVsCount : e.getValue().entrySet()) {
if (!shard.isPass(shardVsCount.getKey())) continue;
for (Map.Entry<String, AtomicInteger> counts : shardVsCount.getValue().entrySet()) {
if (!replica.isPass(counts.getValue())) {
violations.add(new Violation(
e.getKey(),
shardVsCount.getKey(),
tag.name.equals("node") ? counts.getKey() : null,
counts.getValue(),
replica.delta(counts.getValue()),
counts.getKey()
));
}
}
}
}
} else {
for (Row r : allRows) {
if (!tag.isPass(r)) {
violations.add(new Violation(null, null, r.node, r.getVal(tag.name), tag.delta(r.getVal(tag.name)), null));
}
}
}
return violations;
}
private Map<String, Map<String, Map<String, AtomicInteger>>> computeReplicaCounts(List<Row> allRows) {
Map<String, Map<String, Map<String, AtomicInteger>>> collVsShardVsTagVsCount = new HashMap<>();
for (Row row : allRows)
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> colls : row.collectionVsShardVsReplicas.entrySet()) {
String collectionName = colls.getKey();
if (!collection.isPass(collectionName)) continue;
collVsShardVsTagVsCount.putIfAbsent(collectionName, new HashMap<>());
Map<String, Map<String, AtomicInteger>> collMap = collVsShardVsTagVsCount.get(collectionName);
for (Map.Entry<String, List<ReplicaInfo>> shards : colls.getValue().entrySet()) {
String shardName = shards.getKey();
if (ANY.equals(shard.val)) shardName = ANY;
if (!shard.isPass(shardName)) break;
collMap.putIfAbsent(shardName, new HashMap<>());
Map<String, AtomicInteger> tagVsCount = collMap.get(shardName);
Object tagVal = row.getVal(tag.name);
tagVsCount.putIfAbsent(tag.isPass(tagVal) ? String.valueOf(tagVal) : "", new AtomicInteger());
if (tag.isPass(tagVal)) {
tagVsCount.get(String.valueOf(tagVal)).addAndGet(shards.getValue().size());
}
}
}
return collVsShardVsTagVsCount;
}
public boolean isStrict() {
return strict;
}
@Override
public String toString() {
return Utils.toJSONString(original);
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
for (Map.Entry<String, Object> e : original.entrySet()) ew.put(e.getKey(), e.getValue());
}
enum TestStatus {
NOT_APPLICABLE, FAIL, PASS
}
private static final Set<String> IGNORE_TAGS = new HashSet<>(Arrays.asList(REPLICA, COLLECTION, SHARD, "strict"));
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public interface ClusterDataProvider extends Closeable {
/**Get the value of each tag for a given node
*
* @param node node name
* @param tags tag names
* @return a map of tag vs value
*/
Map<String, Object> getNodeValues(String node, Collection<String> tags);
/**
* Get the details of each replica in a node. It attempts to fetch as much details about
* the replica as mentioned in the keys list. It is not necessary to give al details
* <p>
* the format is {collection:shard :[{replicadetails}]}
*/
Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys);
Collection<String> getNodes();
/**Get the collection-specific policy
*/
String getPolicyNameByCollection(String coll);
@Override
default void close() throws IOException {
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Clause.Violation;
import org.apache.solr.cloud.autoscaling.Policy.ReplicaInfo;
import org.apache.solr.cloud.autoscaling.Policy.Suggester;
import org.apache.solr.common.util.Pair;
public class MoveReplicaSuggester extends Suggester {
@Override
SolrRequest init() {
SolrRequest operation = tryEachNode(true);
if (operation == null) operation = tryEachNode(false);
return operation;
}
SolrRequest tryEachNode(boolean strict) {
//iterate through elements and identify the least loaded
List<Clause.Violation> leastSeriousViolation = null;
Integer targetNodeIndex = null;
Integer fromNodeIndex = null;
ReplicaInfo fromReplicaInfo = null;
for (Pair<ReplicaInfo, Row> fromReplica : getValidReplicas(true, true, -1)) {
Row fromRow = fromReplica.second();
ReplicaInfo replicaInfo = fromReplica.first();
String coll = replicaInfo.collection;
String shard = replicaInfo.shard;
Pair<Row, ReplicaInfo> pair = fromRow.removeReplica(coll, shard);
Row tmpRow = pair.first();
if (tmpRow == null) {
//no such replica available
continue;
}
tmpRow.violations.clear();
final int i = getMatrix().indexOf(fromRow);
for (int j = getMatrix().size() - 1; j > i; j--) {
Row targetRow = getMatrix().get(j);
if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue;
targetRow = targetRow.addReplica(coll, shard);
targetRow.violations.clear();
List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), tmpRow, i), targetRow, j));
if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation)) {
leastSeriousViolation = errs;
targetNodeIndex = j;
fromNodeIndex = i;
fromReplicaInfo = replicaInfo;
}
}
}
if (targetNodeIndex != null && fromNodeIndex != null) {
getMatrix().set(fromNodeIndex, getMatrix().get(fromNodeIndex).removeReplica(fromReplicaInfo.collection, fromReplicaInfo.shard).first());
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(fromReplicaInfo.collection, fromReplicaInfo.shard));
return new CollectionAdminRequest.MoveReplica(
fromReplicaInfo.collection,
fromReplicaInfo.name,
getMatrix().get(targetNodeIndex).node);
}
return null;
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.util.Objects;
import org.apache.solr.cloud.autoscaling.Clause.TestStatus;
import static org.apache.solr.cloud.autoscaling.Clause.TestStatus.FAIL;
import static org.apache.solr.cloud.autoscaling.Clause.TestStatus.NOT_APPLICABLE;
import static org.apache.solr.cloud.autoscaling.Clause.TestStatus.PASS;
import static org.apache.solr.cloud.autoscaling.Policy.ANY;
public enum Operand {
WILDCARD(ANY, Integer.MAX_VALUE) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
return testVal == null ? NOT_APPLICABLE : PASS;
}
@Override
public Object parse(String val) {
if (val == null) return ANY;
return ANY.equals(val) || Policy.EACH.equals(val) ? val : null;
}
},
EQUAL("", 0) {
@Override
public int _delta(int expected, int actual) {
return expected - actual;
}
},
NOT_EQUAL("!", 2) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
return super.match(ruleVal, testVal) == PASS ? FAIL : PASS;
}
@Override
public int _delta(int expected, int actual) {
return expected - actual;
}
},
GREATER_THAN(">", 1) {
@Override
public Object parse(String val) {
return checkNumeric(super.parse(val));
}
@Override
public TestStatus match(Object ruleVal, Object testVal) {
if (testVal == null) return NOT_APPLICABLE;
return compareNum(ruleVal, testVal) == 1 ? PASS : FAIL;
}
@Override
protected int _delta(int expected, int actual) {
return actual > expected ? 0 : (expected + 1) - actual;
}
},
LESS_THAN("<", 2) {
@Override
public TestStatus match(Object ruleVal, Object testVal) {
if (testVal == null) return NOT_APPLICABLE;
return compareNum(ruleVal, testVal) == -1 ? PASS : FAIL;
}
@Override
protected int _delta(int expected, int actual) {
return actual < expected ? 0 : (expected ) - actual;
}
@Override
public Object parse(String val) {
return checkNumeric(super.parse(val));
}
};
public final String operand;
final int priority;
Operand(String val, int priority) {
this.operand = val;
this.priority = priority;
}
public String toStr(Object expectedVal) {
return operand + expectedVal.toString();
}
Integer checkNumeric(Object val) {
if (val == null) return null;
try {
return Integer.parseInt(val.toString());
} catch (NumberFormatException e) {
throw new RuntimeException("for operand " + operand + " the value must be numeric");
}
}
public Object parse(String val) {
if (operand.isEmpty()) return val;
return val.startsWith(operand) ? val.substring(1) : null;
}
public TestStatus match(Object ruleVal, Object testVal) {
return Objects.equals(String.valueOf(ruleVal), String.valueOf(testVal)) ? PASS : FAIL;
}
public int compareNum(Object n1Val, Object n2Val) {
Integer n1 = (Integer) parseObj(n1Val, Integer.class);
Integer n2 = (Integer) parseObj(n2Val, Integer.class);
return n1 > n2 ? -1 : Objects.equals(n1, n2) ? 0 : 1;
}
Object parseObj(Object o, Class typ) {
if (o == null) return o;
if (typ == String.class) return String.valueOf(o);
if (typ == Integer.class) {
return Integer.parseInt(String.valueOf(o));
}
return o;
}
public Integer delta(Object expected, Object actual) {
try {
Integer expectedInt = Integer.parseInt(String.valueOf(expected));
Integer actualInt = Integer.parseInt(String.valueOf(actual));
return _delta(expectedInt, actualInt);
} catch (Exception e) {
return null;
}
}
protected int _delta(int expected, int actual) {
return 0;
}
}

View File

@ -0,0 +1,508 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.cloud.autoscaling.Clause.Violation;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
/*The class that reads, parses and applies policies specified in
* autoscaling.json
*
* Create one instance of this class per unique autoscaling.json.
* This is immutable and is thread-safe
*
* Create a fresh new session for each use
*
*/
public class Policy implements MapWriter {
public static final String EACH = "#EACH";
public static final String ANY = "#ANY";
public static final String CLUSTER_POLICY = "cluster-policy";
public static final String CLUSTER_PREFERENCE = "cluster-preferences";
public static final Set<String> GLOBAL_ONLY_TAGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("cores")));
final Map<String, List<Clause>> policies = new HashMap<>();
final List<Clause> clusterPolicy;
final List<Preference> clusterPreferences;
final List<String> params = new ArrayList<>();
public Policy(Map<String, Object> jsonMap) {
clusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCE, emptyList())).stream()
.map(Preference::new)
.collect(toList());
for (int i = 0; i < clusterPreferences.size() - 1; i++) {
Preference preference = clusterPreferences.get(i);
preference.next = clusterPreferences.get(i + 1);
}
if (clusterPreferences.isEmpty()) {
clusterPreferences.add(new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")));
}
clusterPolicy = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_POLICY, emptyList())).stream()
.map(Clause::new)
.collect(Collectors.toList());
((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault("policies", emptyMap())).forEach((s, l1) ->
this.policies.put(s, l1.stream()
.map(Clause::new)
.sorted()
.collect(toList())));
this.policies.forEach((s, c) -> {
for (Clause clause : c) {
if (!clause.isPerCollectiontag())
throw new RuntimeException(clause.globalTag.name + " is only allowed in 'cluster-policy'");
}
});
for (Preference preference : clusterPreferences) {
if (params.contains(preference.name.name())) {
throw new RuntimeException(preference.name + " is repeated");
}
params.add(preference.name.toString());
preference.idx = params.size() - 1;
}
}
public List<Clause> getClusterPolicy() {
return clusterPolicy;
}
public List<Preference> getClusterPreferences() {
return clusterPreferences;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
if (!policies.isEmpty()) {
ew.put("policies", (MapWriter) ew1 -> {
for (Map.Entry<String, List<Clause>> e : policies.entrySet()) {
ew1.put(e.getKey(), e.getValue());
}
});
}
if (!clusterPreferences.isEmpty()) {
ew.put("preferences", (IteratorWriter) iw -> {
for (Preference p : clusterPreferences) iw.add(p);
});
}
}
/*This stores the logical state of the system, given a policy and
* a cluster state.
*
*/
public class Session implements MapWriter {
final List<String> nodes;
final ClusterDataProvider dataProvider;
final List<Row> matrix;
Set<String> collections = new HashSet<>();
List<Clause> expandedClauses;
List<Violation> violations = new ArrayList<>();
private List<String> paramsOfInterest;
private Session(List<String> nodes, ClusterDataProvider dataProvider,
List<Row> matrix, List<Clause> expandedClauses,
List<String> paramsOfInterest) {
this.nodes = nodes;
this.dataProvider = dataProvider;
this.matrix = matrix;
this.expandedClauses = expandedClauses;
this.paramsOfInterest = paramsOfInterest;
}
Session(ClusterDataProvider dataProvider) {
this.nodes = new ArrayList<>(dataProvider.getNodes());
this.dataProvider = dataProvider;
for (String node : nodes) {
collections.addAll(dataProvider.getReplicaInfo(node, Collections.emptyList()).keySet());
}
expandedClauses = clusterPolicy.stream()
.filter(clause -> !clause.isPerCollectiontag())
.collect(Collectors.toList());
for (String c : collections) {
addClausesForCollection(dataProvider, c);
}
Collections.sort(expandedClauses);
List<String> p = new ArrayList<>(params);
p.addAll(expandedClauses.stream().map(clause -> clause.tag.name).distinct().collect(Collectors.toList()));
paramsOfInterest = new ArrayList<>(p);
matrix = new ArrayList<>(nodes.size());
for (String node : nodes) matrix.add(new Row(node, paramsOfInterest, dataProvider));
applyRules();
}
private void addClausesForCollection(ClusterDataProvider dataProvider, String c) {
String p = dataProvider.getPolicyNameByCollection(c);
if (p != null) {
List<Clause> perCollPolicy = policies.get(p);
if (perCollPolicy == null)
throw new RuntimeException(StrUtils.formatString("Policy for collection {0} is {1} . It does not exist", c, p));
}
expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy));
}
Session copy() {
return new Session(nodes, dataProvider, getMatrixCopy(), expandedClauses, paramsOfInterest);
}
List<Row> getMatrixCopy() {
return matrix.stream()
.map(Row::copy)
.collect(Collectors.toList());
}
Policy getPolicy() {
return Policy.this;
}
/**
* Apply the preferences and conditions
*/
private void applyRules() {
if (!clusterPreferences.isEmpty()) {
//this is to set the approximate value according to the precision
ArrayList<Row> tmpMatrix = new ArrayList<>(matrix);
for (Preference p : clusterPreferences) {
Collections.sort(tmpMatrix, (r1, r2) -> p.compare(r1, r2, false));
p.setApproxVal(tmpMatrix);
}
//approximate values are set now. Let's do recursive sorting
Collections.sort(matrix, (r1, r2) -> {
int result = clusterPreferences.get(0).compare(r1, r2, true);
if (result == 0) result = clusterPreferences.get(0).compare(r1, r2, false);
return result;
});
}
for (Clause clause : expandedClauses) {
List<Violation> errs = clause.test(matrix);
violations.addAll(errs);
}
}
public List<Violation> getViolations() {
return violations;
}
public Suggester getSuggester(CollectionAction action) {
Suggester op = ops.get(action).get();
if (op == null) throw new UnsupportedOperationException(action.toString() + "is not supported");
op._init(this);
return op;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
for (int i = 0; i < matrix.size(); i++) {
Row row = matrix.get(i);
ew.put(row.node, row);
}
}
@Override
public String toString() {
return Utils.toJSONString(toMap(new LinkedHashMap<>()));
}
public List<Row> getSorted() {
return Collections.unmodifiableList(matrix);
}
}
public Session createSession(ClusterDataProvider dataProvider) {
return new Session(dataProvider);
}
enum SortParam {
freedisk(0, Integer.MAX_VALUE), cores(0, Integer.MAX_VALUE), heapUsage(0, Integer.MAX_VALUE), sysLoadAvg(0, 100);
public final int min,max;
SortParam(int min, int max) {
this.min = min;
this.max = max;
}
static SortParam get(String m) {
for (SortParam p : values()) if (p.name().equals(m)) return p;
throw new RuntimeException(StrUtils.formatString("Invalid sort {0} Sort must be on one of these {1}", m, Arrays.asList(values())));
}
}
enum Sort {
maximize(1), minimize(-1);
final int sortval;
Sort(int i) {
sortval = i;
}
static Sort get(Map<String, Object> m) {
if (m.containsKey(maximize.name()) && m.containsKey(minimize.name())) {
throw new RuntimeException("Cannot have both 'maximize' and 'minimize'");
}
if (m.containsKey(maximize.name())) return maximize;
if (m.containsKey(minimize.name())) return minimize;
throw new RuntimeException("must have either 'maximize' or 'minimize'");
}
}
public static class ReplicaInfo implements MapWriter {
final String name;
String core, collection, shard;
Map<String, Object> variables;
public ReplicaInfo(String name, String coll, String shard, Map<String, Object> vals) {
this.name = name;
this.variables = vals;
this.collection = coll;
this.shard = shard;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, variables);
}
public String getCore() {
return core;
}
public String getCollection() {
return collection;
}
public String getShard() {
return shard;
}
}
/* A suggester is capable of suggesting a collection operation
* given a particular session. Before it suggests a new operation,
* it ensures that ,
* a) load is reduced on the most loaded node
* b) it causes no new violations
*
*/
public static abstract class Suggester {
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session;
SolrRequest operation;
protected List<Violation> originalViolations = new ArrayList<>();
private boolean isInitialized = false;
private void _init(Session session) {
this.session = session.copy();
}
public Suggester hint(Hint hint, Object value) {
hints.put(hint, value);
return this;
}
abstract SolrRequest init();
public SolrRequest getOperation() {
if (!isInitialized) {
String coll = (String) hints.get(Hint.COLL);
String shard = (String) hints.get(Hint.SHARD);
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(coll))) {
session.addClausesForCollection(session.dataProvider, coll);
Collections.sort(session.expandedClauses);
}
if (coll != null) {
for (Row row : session.matrix) {
if (!row.collectionVsShardVsReplicas.containsKey(coll)) row.collectionVsShardVsReplicas.put(coll, new HashMap<>());
if (shard != null) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.get(coll);
if (!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>());
}
}
}
session.applyRules();
originalViolations.addAll(session.getViolations());
this.operation = init();
isInitialized = true;
}
return operation;
}
public Session getSession() {
return session;
}
List<Row> getMatrix() {
return session.matrix;
}
//check if the fresh set of violations is less serious than the last set of violations
boolean isLessSerious(List<Violation> fresh, List<Violation> old) {
if (old == null || fresh.size() < old.size()) return true;
if (fresh.size() == old.size()) {
for (int i = 0; i < fresh.size(); i++) {
Violation freshViolation = fresh.get(i);
Violation oldViolation = null;
for (Violation v : old) {
if (v.equals(freshViolation)) oldViolation = v;
}
if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
}
}
return false;
}
boolean containsNewErrors(List<Violation> violations) {
for (Violation v : violations) {
int idx = originalViolations.indexOf(v);
if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
}
return false;
}
List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
List<Pair<Policy.ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
if (sortDesc) {
if (until == -1) until = getMatrix().size();
for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
} else {
if (until == -1) until = 0;
for (int i = getMatrix().size() - 1; i >= until; i--)
addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
}
return allPossibleReplicas;
}
void addReplicaToList(Row r, boolean isSource, List<Pair<Policy.ReplicaInfo, Row>> replicaList) {
if (!isAllowed(r.node, isSource ? Hint.SRC_NODE : Hint.TARGET_NODE)) return;
for (Map.Entry<String, Map<String, List<Policy.ReplicaInfo>>> e : r.collectionVsShardVsReplicas.entrySet()) {
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
for (Map.Entry<String, List<Policy.ReplicaInfo>> shard : e.getValue().entrySet()) {
if (!isAllowed(e.getKey(), Hint.SHARD)) continue;
replicaList.add(new Pair<>(shard.getValue().get(0), r));
}
}
}
protected List<Violation> testChangedMatrix(boolean strict, List<Row> rows) {
List<Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) {
if (strict || clause.strict) {
List<Violation> errs = clause.test(rows);
if (!errs.isEmpty()) {
errors.addAll(errs);
}
}
}
return errors;
}
ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
ArrayList<Row> copy = new ArrayList<>(matrix);
copy.set(i, tmpRow);
return copy;
}
protected boolean isAllowed(Object v, Hint hint) {
Object hintVal = hints.get(hint);
return hintVal == null || Objects.equals(v, hintVal);
}
public enum Hint {
COLL, SHARD, SRC_NODE, TARGET_NODE
}
}
static List<Clause> mergePolicies(String coll,
List<Clause> collPolicy,
List<Clause> globalPolicy) {
List<Clause> merged = insertColl(coll, collPolicy);
List<Clause> global = insertColl(coll, globalPolicy);
merged.addAll(global.stream()
.filter(clusterPolicyClause -> merged.stream().noneMatch(perCollPolicy -> perCollPolicy.doesOverride(clusterPolicyClause)))
.collect(Collectors.toList()));
return merged;
}
/**
* Insert the collection name into the clauses where collection is not specified
*/
static List<Clause> insertColl(String coll, Collection<Clause> conditions) {
return conditions.stream()
.filter(Clause::isPerCollectiontag)
.map(clause -> {
Map<String, Object> copy = new LinkedHashMap<>(clause.original);
if (!copy.containsKey("collection")) copy.put("collection", coll);
return new Clause(copy);
})
.filter(it -> (it.collection.isPass(coll)))
.collect(Collectors.toList());
}
private static final Map<CollectionAction, Supplier<Suggester>> ops = new HashMap<>();
static {
ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
public class PolicyHelper {
public static Map<String, List<String>> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
ClusterDataProvider cdp,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
int repFactor) {
Map<String, List<String>> positionMapping = new HashMap<>();
for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
if (optionalPolicyMapping != null) {
final ClusterDataProvider delegate = cdp;
cdp = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return delegate.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return delegate.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getNodes() {
return delegate.getNodes();
}
@Override
public String getPolicyNameByCollection(String coll) {
return optionalPolicyMapping.containsKey(coll) ?
optionalPolicyMapping.get(coll) :
delegate.getPolicyNameByCollection(coll);
}
};
}
Policy policy = new Policy(autoScalingJson);
Policy.Session session = policy.createSession(cdp);
for (String shardName : shardNames) {
for (int i = 0; i < repFactor; i++) {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, collName)
.hint(Hint.SHARD, shardName);
SolrRequest op = suggester.getOperation();
if (op == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));
}
session = suggester.getSession();
positionMapping.get(shardName).add(op.getParams().get(CoreAdminParams.NODE));
}
}
return positionMapping;
}
public List<Map> addNode(Map<String, Object> autoScalingJson, String node, ClusterDataProvider cdp) {
//todo
return null;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
class Preference implements MapWriter {
final Policy.SortParam name;
Integer precision;
final Policy.Sort sort;
Preference next;
public int idx;
private final Map original;
Preference(Map<String, Object> m) {
this.original = Utils.getDeepCopy(m,3);
sort = Policy.Sort.get(m);
name = Policy.SortParam.get(m.get(sort.name()).toString());
Object p = m.getOrDefault("precision", 0);
precision = p instanceof Number ? ((Number) p).intValue() : Integer.parseInt(p.toString());
if (precision < 0) {
throw new RuntimeException("precision must be a positive value ");
}
if(precision< name.min || precision> name.max){
throw new RuntimeException(StrUtils.formatString("invalid precision value {0} must lie between {1} and {1}",
precision, name.min, name.max ) );
}
}
// there are 2 modes of compare.
// recursive, it uses the precision to tie & when there is a tie use the next preference to compare
// in non-recursive mode, precision is not taken into consideration and sort is done on actual value
int compare(Row r1, Row r2, boolean useApprox) {
Object o1 = useApprox ? r1.cells[idx].approxVal : r1.cells[idx].val;
Object o2 = useApprox ? r2.cells[idx].approxVal : r2.cells[idx].val;
int result = 0;
if (o1 instanceof Integer && o2 instanceof Integer) result = ((Integer) o1).compareTo((Integer) o2);
if (o1 instanceof Long && o2 instanceof Long) result = ((Long) o1).compareTo((Long) o2);
if (o1 instanceof Float && o2 instanceof Float) result = ((Float) o1).compareTo((Float) o2);
if (o1 instanceof Double && o2 instanceof Double) result = ((Double) o1).compareTo((Double) o2);
return result == 0 ? next == null ? 0 : next.compare(r1, r2, useApprox) : sort.sortval * result;
}
//sets the new value according to precision in val_
void setApproxVal(List<Row> tmpMatrix) {
Object prevVal = null;
for (Row row : tmpMatrix) {
prevVal = row.cells[idx].approxVal =
prevVal == null || Math.abs(((Number) prevVal).longValue() - ((Number) row.cells[idx].val).longValue()) > precision ?
row.cells[idx].val :
prevVal;
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
for (Object o : original.entrySet()) {
Map.Entry e = (Map.Entry) o;
ew.put(String.valueOf(e.getKey()), e.getValue());
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.cloud.autoscaling.Policy.ReplicaInfo;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
class Row implements MapWriter {
public final String node;
final Cell[] cells;
Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
List<Clause> violations = new ArrayList<>();
boolean anyValueMissing = false;
Row(String node, List<String> params, ClusterDataProvider dataProvider) {
collectionVsShardVsReplicas = dataProvider.getReplicaInfo(node, params);
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
this.node = node;
cells = new Cell[params.size()];
Map<String, Object> vals = dataProvider.getNodeValues(node, params);
for (int i = 0; i < params.size(); i++) {
String s = params.get(i);
cells[i] = new Cell(i, s, vals.get(s));
if (NODE.equals(s)) cells[i].val = node;
if (cells[i].val == null) anyValueMissing = true;
}
}
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, List<Clause> violations) {
this.node = node;
this.cells = new Cell[cells.length];
for (int i = 0; i < this.cells.length; i++) {
this.cells[i] = cells[i].copy();
}
this.anyValueMissing = anyValueMissing;
this.collectionVsShardVsReplicas = collectionVsShardVsReplicas;
this.violations = violations;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(node, (IteratorWriter) iw -> {
iw.add((MapWriter) e -> e.put("replicas", collectionVsShardVsReplicas));
for (Cell cell : cells) iw.add(cell);
});
}
Row copy() {
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), new ArrayList<>(violations));
}
Object getVal(String name) {
for (Cell cell : cells) if (cell.name.equals(name)) return cell.val;
return null;
}
@Override
public String toString() {
return node;
}
// this adds a replica to the replica info
Row addReplica(String coll, String shard) {
Row row = copy();
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
if (c == null) row.collectionVsShardVsReplicas.put(coll, c = new HashMap<>());
List<ReplicaInfo> replicas = c.get(shard);
if (replicas == null) c.put(shard, replicas = new ArrayList<>());
replicas.add(new ReplicaInfo("" + new Random().nextInt(1000) + 1000, coll, shard, new HashMap<>()));
for (Cell cell : row.cells) {
if (cell.name.equals("cores")) cell.val = ((Number) cell.val).intValue() + 1;
}
return row;
}
Pair<Row, ReplicaInfo> removeReplica(String coll, String shard) {
Row row = copy();
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
List<ReplicaInfo> s = c.get(shard);
if (s == null || s.isEmpty()) return null;
return new Pair(row, s.remove(0));
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Common classes for autoscaling parsing filtering nodes and sorting
*/
package org.apache.solr.cloud.autoscaling;

View File

@ -19,13 +19,16 @@ package org.apache.solr.common;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
/**
* Interface to help do push writing to an array
*/
public interface IteratorWriter {
/**
* @param iw after this method returns , the EntryWriter Object is invalid
* @param iw after this method returns , the ItemWriter Object is invalid
* Do not hold a reference to this object
*/
void writeIter(ItemWriter iw) throws IOException;
@ -62,4 +65,20 @@ public interface IteratorWriter {
return this;
}
}
default List toList( List l) {
try {
writeIter(new ItemWriter() {
@Override
public ItemWriter add(Object o) throws IOException {
if (o instanceof MapWriter) o = ((MapWriter) o).toMap(new LinkedHashMap<>());
if (o instanceof IteratorWriter) o = ((IteratorWriter) o).toList(new ArrayList<>());
l.add(o);
return this;
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return l;
}
}

View File

@ -19,6 +19,8 @@ package org.apache.solr.common;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
/**
@ -34,6 +36,8 @@ public interface MapWriter extends MapSerializable {
writeMap(new EntryWriter() {
@Override
public EntryWriter put(String k, Object v) throws IOException {
if (v instanceof MapWriter) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
if (v instanceof IteratorWriter) v = ((IteratorWriter) v).toList(new ArrayList<>());
map.put(k, v);
return this;
}
@ -60,6 +64,12 @@ public interface MapWriter extends MapSerializable {
*/
EntryWriter put(String k, Object v) throws IOException;
default EntryWriter putIfNotNull(String k, Object v) throws IOException {
if(v != null) put(k,v);
return this;
}
default EntryWriter put(String k, int v) throws IOException {
put(k, (Integer) v);
return this;

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -170,6 +171,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return slices.get(sliceName);
}
public void forEachReplica(BiConsumer<String, Replica> consumer) {
slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) -> consumer.accept(shard, replica)));
}
/**
* Gets the list of all slices for this collection.
*/

View File

@ -32,6 +32,7 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Pattern;
@ -44,6 +45,7 @@ import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@ -362,6 +364,19 @@ public class SolrZkClient implements Closeable {
}
}
public Map<String, Object> getJson(String path, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
byte[] bytes = null;
try {
bytes = getData(path, null, null, retryOnConnLoss);
} catch (KeeperException.NoNodeException e) {
return null;
}
if (bytes != null && bytes.length > 0) {
return (Map<String, Object>) Utils.fromJSON(bytes);
}
return null;
}
/**
* Returns node's state
*/

View File

@ -93,6 +93,7 @@ public class ZkStateReader implements Closeable {
public static final String CLUSTER_PROPS = "/clusterprops.json";
public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";

View File

@ -46,7 +46,10 @@ public class ImplicitSnitch extends Snitch {
public static final String CORES = "cores";
public static final String DISK = "freedisk";
public static final String ROLE = "role";
public static final String NODEROLE = "nodeRole";
public static final String SYSPROP = "sysprop.";
public static final String SYSLOADAVG = "sysLoadAvg";
public static final String HEAPUSAGE = "heapUsage";
public static final List<String> IP_SNITCHES = Collections.unmodifiableList(Arrays.asList("ip_1", "ip_2", "ip_3", "ip_4"));
public static final Set<String> tags = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(NODE, PORT, HOST, CORES, DISK, ROLE, "ip_1", "ip_2", "ip_3", "ip_4")));
@ -61,9 +64,15 @@ public class ImplicitSnitch extends Snitch {
Matcher hostAndPortMatcher = hostAndPortPattern.matcher(solrNode);
if (hostAndPortMatcher.find()) ctx.getTags().put(PORT, hostAndPortMatcher.group(2));
}
if (requestedTags.contains(ROLE)) fillRole(solrNode, ctx);
if (requestedTags.contains(ROLE)) fillRole(solrNode, ctx, ROLE);
if (requestedTags.contains(NODEROLE)) fillRole(solrNode, ctx, NODEROLE);// for new policy framework
addIpTags(solrNode, requestedTags, ctx);
getRemoteInfo(solrNode, requestedTags, ctx);
}
protected void getRemoteInfo(String solrNode, Set<String> requestedTags, SnitchContext ctx) {
ModifiableSolrParams params = new ModifiableSolrParams();
if (requestedTags.contains(CORES)) params.add(CORES, "1");
if (requestedTags.contains(DISK)) params.add(DISK, "1");
@ -73,7 +82,7 @@ public class ImplicitSnitch extends Snitch {
if (params.size() > 0) ctx.invokeRemote(solrNode, params, "org.apache.solr.cloud.rule.ImplicitSnitch", null);
}
private void fillRole(String solrNode, SnitchContext ctx) {
private void fillRole(String solrNode, SnitchContext ctx, String key) {
Map roles = (Map) ctx.retrieve(ZkStateReader.ROLES); // we don't want to hit the ZK for each node
// so cache and reuse
if(roles == null) roles = ctx.getZkJson(ZkStateReader.ROLES);
@ -83,7 +92,7 @@ public class ImplicitSnitch extends Snitch {
Map.Entry e = (Map.Entry) o;
if (e.getValue() instanceof List) {
if(((List) e.getValue()).contains(solrNode)) {
ctx.getTags().put(ROLE, e.getKey());
ctx.getTags().put(key, e.getKey());
break;
}
}

View File

@ -44,10 +44,6 @@ public abstract class SnitchContext implements RemoteCallback {
this.session = session;
}
public SnitchInfo getSnitchInfo() {
return snitchInfo;
}
public Map<String, Object> getTags() {
return tags;
}

View File

@ -181,6 +181,8 @@ public interface CommonParams {
String AUTHC_PATH = "/admin/authentication";
String ZK_PATH = "/admin/zookeeper";
String METRICS_PATH = "/admin/metrics";
String AUTOSCALING_PATH = "/admin/autoscaling";
String AUTOSCALING_DIAGNOSTICS_PATH = "/admin/autoscaling/diagnostics";
Set<String> ADMIN_PATHS = new HashSet<>(Arrays.asList(
CORES_HANDLER_PATH,
@ -188,7 +190,9 @@ public interface CommonParams {
CONFIGSETS_HANDLER_PATH,
AUTHC_PATH,
AUTHZ_PATH,
METRICS_PATH));
METRICS_PATH,
AUTOSCALING_PATH,
AUTOSCALING_DIAGNOSTICS_PATH));
/** valid values for: <code>echoParams</code> */
enum EchoParamStyle {

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.noggit.JSONParser;
@ -59,17 +60,18 @@ public class CommandOperation {
}
public boolean getBoolean(String key, boolean def) {
String v = getStr(key,null);
return v == null? def:Boolean.parseBoolean(v);
String v = getStr(key, null);
return v == null ? def : Boolean.parseBoolean(v);
}
public void setCommandData(Object o){
public void setCommandData(Object o) {
commandData = o;
}
public Map<String,Object> getDataMap() {
public Map<String, Object> getDataMap() {
if (commandData instanceof Map) {
//noinspection unchecked
return (Map<String,Object>)commandData;
return (Map<String, Object>) commandData;
}
addError(StrUtils.formatString("The command ''{0}'' should have the values as a json object {key:val} format", name));
return Collections.emptyMap();
@ -89,7 +91,7 @@ public class CommandOperation {
}
private Object getMapVal(String key) {
if("".equals(key)){
if ("".equals(key)) {
if (commandData instanceof Map) {
addError("value of the command is an object should be primitive");
}
@ -183,10 +185,10 @@ public class CommandOperation {
* Get all the values from the metadata for the command
* without the specified keys
*/
public Map<String,Object> getValuesExcluding(String... keys) {
public Map<String, Object> getValuesExcluding(String... keys) {
getMapVal(null);
if (hasError()) return emptyMap();//just to verify the type is Map
@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked")
LinkedHashMap<String, Object> cp = new LinkedHashMap<>((Map<String, Object>) commandData);
if (keys == null) return cp;
for (String key : keys) {
@ -213,11 +215,19 @@ public class CommandOperation {
return errors;
}
public static List<CommandOperation> parse(Reader rdr) throws IOException {
return parse(rdr, Collections.emptySet());
}
/**
* Parse the command operations into command objects
*
* @param rdr The payload
* @param singletonCommands commands that cannot be repeated
* @return parsed list of commands
*/
public static List<CommandOperation> parse(Reader rdr) throws IOException {
public static List<CommandOperation> parse(Reader rdr, Set<String> singletonCommands) throws IOException {
JSONParser parser = new JSONParser(rdr);
ObjectBuilder ob = new ObjectBuilder(parser);
@ -232,7 +242,7 @@ public class CommandOperation {
Object key = ob.getKey();
ev = parser.nextEvent();
Object val = ob.getVal();
if (val instanceof List) {
if (val instanceof List && !singletonCommands.contains(key)) {
List list = (List) val;
for (Object o : list) {
if (!(o instanceof Map)) {
@ -270,7 +280,21 @@ public class CommandOperation {
return new String(toJSON(singletonMap(name, commandData)), StandardCharsets.UTF_8);
}
public static List<CommandOperation> readCommands(Iterable<ContentStream> streams, NamedList resp)
public static List<CommandOperation> readCommands(Iterable<ContentStream> streams, NamedList resp) throws IOException {
return readCommands(streams, resp, Collections.emptySet());
}
/**
* Read commands from request streams
*
* @param streams the streams
* @param resp solr query response
* @param singletonCommands , commands that cannot be repeated
* @return parsed list of commands
* @throws IOException if there is an error while parsing the stream
*/
public static List<CommandOperation> readCommands(Iterable<ContentStream> streams, NamedList resp, Set<String> singletonCommands)
throws IOException {
if (streams == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
@ -278,7 +302,7 @@ public class CommandOperation {
ArrayList<CommandOperation> ops = new ArrayList<>();
for (ContentStream stream : streams)
ops.addAll(parse(stream.getReader()));
ops.addAll(parse(stream.getReader(), singletonCommands));
List<Map> errList = CommandOperation.captureErrors(ops);
if (!errList.isEmpty()) {
resp.add(CommandOperation.ERR_MSGS, errList);
@ -312,7 +336,7 @@ public class CommandOperation {
public Integer getInt(String name) {
Object o = getVal(name);
if(o == null) return null;
if (o == null) return null;
return getInt(name, null);
}
}

View File

@ -33,6 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.common.EnumFieldValue;
import org.apache.solr.common.IteratorWriter;
@ -390,7 +392,18 @@ public class JavaBinCodec implements PushWriter {
writeMap(((MapSerializable) val).toMap(new NamedList().asShallowMap()));
return true;
}
if (val instanceof AtomicInteger) {
writeInt(((AtomicInteger) val).get());
return true;
}
if (val instanceof AtomicLong) {
writeLong(((AtomicLong) val).get());
return true;
}
if (val instanceof AtomicBoolean) {
writeBoolean(((AtomicBoolean) val).get());
return true;
}
return false;
}

View File

@ -31,11 +31,15 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.noggit.CharArr;
import org.noggit.JSONParser;
@ -51,31 +55,58 @@ public class Utils {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static Map getDeepCopy(Map map, int maxDepth) {
return getDeepCopy(map, maxDepth, true);
return getDeepCopy(map, maxDepth, true, false);
}
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable) {
return getDeepCopy(map, maxDepth, mutable, false);
}
public static Map getDeepCopy(Map map, int maxDepth, boolean mutable, boolean sorted) {
if(map == null) return null;
if (maxDepth < 1) return map;
Map copy = new LinkedHashMap();
Map copy;
if (sorted) {
copy = new TreeMap();
} else {
copy = new LinkedHashMap();
}
for (Object o : map.entrySet()) {
Map.Entry e = (Map.Entry) o;
Object v = e.getValue();
if (v instanceof Map) v = getDeepCopy((Map) v, maxDepth - 1, mutable);
else if (v instanceof Collection) v = getDeepCopy((Collection) v, maxDepth - 1, mutable);
copy.put(e.getKey(), v);
copy.put(e.getKey(), makeDeepCopy(e.getValue(),maxDepth, mutable, sorted));
}
return mutable ? copy : Collections.unmodifiableMap(copy);
}
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable) {
if (c == null || maxDepth < 1) return c;
Collection result = c instanceof Set ? new HashSet() : new ArrayList();
for (Object o : c) {
if (o instanceof Map) {
o = getDeepCopy((Map) o, maxDepth - 1, mutable);
private static Object makeDeepCopy(Object v, int maxDepth, boolean mutable, boolean sorted) {
if (v instanceof MapWriter && maxDepth > 1) {
v = ((MapWriter) v).toMap(new LinkedHashMap<>());
} else if (v instanceof IteratorWriter && maxDepth > 1) {
v = ((IteratorWriter) v).toList(new ArrayList<>());
if (sorted) {
Collections.sort((List)v);
}
result.add(o);
}
if (v instanceof Map) {
v = getDeepCopy((Map) v, maxDepth - 1, mutable, sorted);
} else if (v instanceof Collection) {
v = getDeepCopy((Collection) v, maxDepth - 1, mutable, sorted);
}
return v;
}
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable) {
return getDeepCopy(c, maxDepth, mutable, false);
}
public static Collection getDeepCopy(Collection c, int maxDepth, boolean mutable, boolean sorted) {
if (c == null || maxDepth < 1) return c;
Collection result = c instanceof Set ?
( sorted? new TreeSet() : new HashSet()) : new ArrayList();
for (Object o : c) result.add(makeDeepCopy(o, maxDepth, mutable, sorted));
if (sorted && (result instanceof List)) {
Collections.sort((List)result);
}
return mutable ? result : result instanceof Set ? unmodifiableSet((Set) result) : unmodifiableList((List) result);
}
@ -83,6 +114,13 @@ public class Utils {
public static byte[] toJSON(Object o) {
if(o == null) return new byte[0];
CharArr out = new CharArr();
if (!(o instanceof List) && !(o instanceof Map)) {
if (o instanceof MapWriter) {
o = ((MapWriter)o).toMap(new LinkedHashMap<>());
} else if(o instanceof IteratorWriter){
o = ((IteratorWriter)o).toList(new ArrayList<>());
}
}
new JSONWriter(out, 2).write(o); // indentation by default
return toUTF8(out);
}
@ -112,12 +150,18 @@ public class Utils {
}
public static Map<String, Object> makeMap(Object... keyVals) {
return makeMap(false, keyVals);
}
public static Map<String, Object> makeMap(boolean skipNulls, Object... keyVals) {
if ((keyVals.length & 0x01) != 0) {
throw new IllegalArgumentException("arguments should be key,value");
}
Map<String, Object> propMap = new LinkedHashMap<>(keyVals.length >> 1);
for (int i = 0; i < keyVals.length; i += 2) {
propMap.put(keyVals[i].toString(), keyVals[i + 1]);
Object keyVal = keyVals[i + 1];
if (skipNulls && keyVal == null) continue;
propMap.put(keyVals[i].toString(), keyVal);
}
return propMap;
}
@ -152,6 +196,7 @@ public class Utils {
}
public static Object getObjectByPath(Map root, boolean onlyPrimitive, List<String> hierarchy) {
if(root == null) return null;
Map obj = root;
for (int i = 0; i < hierarchy.size(); i++) {
int idx = -1;

View File

@ -0,0 +1,520 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.Clause.Violation;
import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class TestPolicy extends SolrTestCaseJ4 {
public static String clusterState = "{'gettingstarted':{" +
" 'router':{'name':'compositeId'}," +
" 'shards':{" +
" 'shard1':{" +
" 'range':'80000000-ffffffff'," +
" 'replicas':{" +
" 'r1':{" +
" 'core':r1," +
" 'base_url':'http://10.0.0.4:8983/solr'," +
" 'node_name':'node1'," +
" 'state':'active'," +
" 'leader':'true'}," +
" 'r2':{" +
" 'core':r2," +
" 'base_url':'http://10.0.0.4:7574/solr'," +
" 'node_name':'node2'," +
" 'state':'active'}}}," +
" 'shard2':{" +
" 'range':'0-7fffffff'," +
" 'replicas':{" +
" 'r3':{" +
" 'core':r3," +
" 'base_url':'http://10.0.0.4:8983/solr'," +
" 'node_name':'node1'," +
" 'state':'active'," +
" 'leader':'true'}," +
" 'r4':{" +
" 'core':r4," +
" 'base_url':'http://10.0.0.4:8987/solr'," +
" 'node_name':'node4'," +
" 'state':'active'}," +
" 'r6':{" +
" 'core':r6," +
" 'base_url':'http://10.0.0.4:8989/solr'," +
" 'node_name':'node3'," +
" 'state':'active'}," +
" 'r5':{" +
" 'core':r5," +
" 'base_url':'http://10.0.0.4:7574/solr'," +
" 'node_name':'node1'," +
" 'state':'active'}}}}}}";
public static Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaDetails(String node, String s) {
ValidatingJsonMap m = ValidatingJsonMap
.getDeepCopy((Map) Utils.fromJSONString(s), 6, true);
Map<String, Map<String, List<Policy.ReplicaInfo>>> result = new LinkedHashMap<>();
m.forEach((collName, o) -> {
ValidatingJsonMap coll = (ValidatingJsonMap) o;
coll.getMap("shards").forEach((shard, o1) -> {
ValidatingJsonMap sh = (ValidatingJsonMap) o1;
sh.getMap("replicas").forEach((replicaName, o2) -> {
ValidatingJsonMap r = (ValidatingJsonMap) o2;
String node_name = (String) r.get("node_name");
if (!node_name.equals(node)) return;
Map<String, List<Policy.ReplicaInfo>> shardVsReplicaStats = result.get(collName);
if (shardVsReplicaStats == null) result.put(collName, shardVsReplicaStats = new HashMap<>());
List<Policy.ReplicaInfo> replicaInfos = shardVsReplicaStats.get(shard);
if (replicaInfos == null) shardVsReplicaStats.put(shard, replicaInfos = new ArrayList<>());
replicaInfos.add(new Policy.ReplicaInfo(replicaName, collName, shard, new HashMap<>()));
});
});
});
return result;
}
public void testOperands() {
Clause c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:'<2', node:'#ANY'}"));
assertFalse(c.replica.isPass(3));
assertFalse(c.replica.isPass(2));
assertTrue(c.replica.isPass(1));
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:'>2', node:'#ANY'}"));
assertTrue(c.replica.isPass(3));
assertFalse(c.replica.isPass(2));
assertFalse(c.replica.isPass(1));
c = new Clause((Map<String, Object>) Utils.fromJSONString("{replica:0, nodeRole:'!overseer'}"));
assertTrue(c.tag.isPass("OVERSEER"));
assertFalse(c.tag.isPass("overseer"));
}
public void testRow() {
Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), new ArrayList<>());
Row r1 = row.addReplica("c1", "s1");
Row r2 = r1.addReplica("c1", "s1");
assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size());
assertEquals(2, r2.collectionVsShardVsReplicas.get("c1").get("s1").size());
assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(0) instanceof Policy.ReplicaInfo);
assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(1) instanceof Policy.ReplicaInfo);
}
public void testMerge() {
Map map = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]," +
" 'policies': {" +
" 'policy1': [" +
" { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}," +
" { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH', 'rack': 'rack1'}" +
" ]" +
" }" +
"}");
Policy policy = new Policy(map);
List<Clause> clauses = Policy.mergePolicies("mycoll", policy.policies.get("policy1"), policy.clusterPolicy);
Collections.sort(clauses);
assertEquals(clauses.size(), 4);
assertEquals("1", String.valueOf(clauses.get(0).original.get("replica")));
assertEquals("0", String.valueOf(clauses.get(1).original.get("replica")));
assertEquals("#ANY", clauses.get(3).original.get("shard"));
assertEquals("rack1", clauses.get(2).original.get("rack"));
assertEquals("overseer", clauses.get(1).original.get("nodeRole"));
}
public void testConditionsSort() {
String rules = "{" +
" 'cluster-policy':[" +
" { 'nodeRole':'overseer', replica: 0, 'strict':false}," +
" { 'replica':'<1', 'node':'node3', 'shard':'#EACH'}," +
" { 'replica':'<2', 'node':'#ANY', 'shard':'#EACH'}," +
" { 'replica':1, 'rack':'rack1'}]" +
" }";
Policy p = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
List<Clause> clauses = new ArrayList<>(p.getClusterPolicy());
Collections.sort(clauses);
assertEquals("nodeRole", clauses.get(1).tag.name);
assertEquals("rack", clauses.get(0).tag.name);
}
public void testRules() throws IOException {
String rules = "{" +
"cluster-policy:[" +
"{nodeRole:'overseer',replica : 0 , strict:false}," +
"{replica:'<1',node:node3}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}]," +
" cluster-preferences:[" +
"{minimize:cores , precision:2}," +
"{maximize:freedisk, precision:50}, " +
"{minimize:heapUsage, precision:1000}]}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heapUsage:10480}," +
"node2:{cores:4, freedisk: 749, heapUsage:6873}," +
"node3:{cores:7, freedisk: 262, heapUsage:7834}," +
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer}" +
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
Policy.Session session;
session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
List<Row> l = session.getSorted();
assertEquals("node1", l.get(0).node);
assertEquals("node3", l.get(1).node);
assertEquals("node4", l.get(2).node);
assertEquals("node2", l.get(3).node);
List<Violation> violations = session.getViolations();
assertEquals(3, violations.size());
assertTrue(violations.stream().anyMatch(violation -> "node3".equals(violation.getClause().tag.val)));
assertTrue(violations.stream().anyMatch(violation -> "nodeRole".equals(violation.getClause().tag.name)));
assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.op == Operand.LESS_THAN && "node".equals(violation.getClause().tag.name))));
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "gettingstarted")
.hint(Hint.SHARD, "r1");
SolrParams operation = suggester.getOperation().getParams();
assertEquals("node2", operation.get("node"));
nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heapUsage:10480}," +
"node2:{cores:4, freedisk: 749, heapUsage:6873}," +
"node3:{cores:7, freedisk: 262, heapUsage:7834}," +
"node5:{cores:0, freedisk: 895, heapUsage:17834}," +
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer}" +
"}");
session = policy.createSession(getClusterDataProvider(nodeValues, clusterState));
SolrRequest opReq = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "node5")
.getOperation();
assertNotNull(opReq);
assertEquals("node5", opReq.getParams().get("targetNode"));
}
public void testGreedyConditions() {
String autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" { 'replica': 2, 'sysprop.fs': 'ssd', 'shard': '#EACH'}," +//greedy condition
" {'nodeRole':'overseer','replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heapUsage:10480, rack: rack4}," +
"node2:{cores:4, freedisk: 749, heapUsage:6873, rack: rack3}," +
"node3:{cores:7, freedisk: 262, heapUsage:7834, rack: rack2, sysprop.fs : ssd}," +
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1}" +
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState);
ClusterDataProvider cdp = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return clusterDataProvider.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return clusterDataProvider.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getNodes() {
return clusterDataProvider.getNodes();
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
};
Policy.Session session = policy.createSession(cdp);
Policy.Suggester suggester = session.getSuggester(ADDREPLICA);
SolrRequest op = suggester
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1")
.getOperation();
assertNotNull(op);
assertEquals("node3", op.getParams().get("node"));
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1");
op = suggester.getOperation();
assertNotNull(op);
assertEquals("node3", op.getParams().get("node"));
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "shard1");
op = suggester.getOperation();
assertNotNull(op);
assertEquals("node2", op.getParams().get("node"));
}
public void testMoveReplica() {
String autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" {'nodeRole':'overseer','replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
Map replicaInfoMap = (Map) Utils.fromJSONString("{ '127.0.0.1:60099_solr':{}," +
" '127.0.0.1:60089_solr':{'compute_plan_action_test':{'shard1':[" +
" {'core_node1':{}}," +
" {'core_node2':{}}]}}}");
Map m = (Map) Utils.getObjectByPath(replicaInfoMap, false, "127.0.0.1:60089_solr/compute_plan_action_test");
m.put("shard1", Arrays.asList(
new Policy.ReplicaInfo("core_node1", "compute_plan_action_test", "shard1", Collections.emptyMap()),
new Policy.ReplicaInfo("core_node2", "compute_plan_action_test", "shard1", Collections.emptyMap())
));
Map<String, Map<String, Object>> tagsMap = (Map) Utils.fromJSONString("{" +
" '127.0.0.1:60099_solr':{" +
" 'cores':0," +
" 'freedisk':918005641216}," +
" '127.0.0.1:60089_solr':{" +
" 'cores':2," +
" 'freedisk':918005641216}}}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
Policy.Session session = policy.createSession(new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return tagsMap.get(node);
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return (Map<String, Map<String, List<Policy.ReplicaInfo>>>) replicaInfoMap.get(node);
}
@Override
public Collection<String> getNodes() {
return replicaInfoMap.keySet();
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
});
Policy.Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
SolrParams op = suggester.getOperation().getParams();
assertNotNull(op);
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA).hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
op = suggester.getOperation().getParams();
assertNotNull(op);
}
public void testOtherTag() {
String rules = "{" +
"'cluster-preferences':[" +
"{'minimize':'cores','precision':2}," +
"{'maximize':'freedisk','precision':50}," +
"{'minimize':'heapUsage','precision':1000}" +
"]," +
"'cluster-policy':[" +
"{replica:0, 'nodeRole':'overseer','strict':false}," +
"{'replica':'<1','node':'node3'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}" +
"]," +
"'policies':{" +
"'p1':[" +
"{replica:0, 'nodeRole':'overseer','strict':false}," +
"{'replica':'<1','node':'node3'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}," +
"{'replica':'<3','shard':'#EACH','rack':'#ANY'}" +
"]" +
"}" +
"}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heapUsage:10480, rack: rack4}," +
"node2:{cores:4, freedisk: 749, heapUsage:6873, rack: rack3}," +
"node3:{cores:7, freedisk: 262, heapUsage:7834, rack: rack2}," +
"node4:{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1}" +
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
ClusterDataProvider clusterDataProvider = getClusterDataProvider(nodeValues, clusterState);
ClusterDataProvider cdp = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return clusterDataProvider.getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return clusterDataProvider.getReplicaInfo(node, keys);
}
@Override
public Collection<String> getNodes() {
return clusterDataProvider.getNodes();
}
@Override
public String getPolicyNameByCollection(String coll) {
return "p1";
}
};
Policy.Session session = policy.createSession(cdp);
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) session
.getSuggester(ADDREPLICA)
.hint(Hint.COLL, "newColl")
.hint(Hint.SHARD, "s1").getOperation();
assertNotNull(op);
assertEquals("node2", op.getNode());
}
private ClusterDataProvider getClusterDataProvider(final Map<String, Map> nodeValues, String clusterState) {
return new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
Map<String, Object> result = new LinkedHashMap<>();
tags.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Collection<String> getNodes() {
return nodeValues.keySet();
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
}
};
}
public void testMultiReplicaPlacement() {
String autoScaleJson = "{" +
" 'cluster-preferences': [" +
" { maximize : freedisk , precision: 50}," +
" { minimize : cores, precision: 2}" +
" ]," +
" 'cluster-policy': [" +
" { replica : '0' , 'nodeRole': 'overseer'}," +
" { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'" +
" }" +
" ]," +
" 'policies': {" +
" 'policy1': [" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH', 'rack': 'rack1'}" +
// " { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}" +
" ]" +
" }" +
"}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"node1:{cores:12, freedisk: 334, heap:10480, rack:rack3}," +
"node2:{cores:4, freedisk: 749, heap:6873, sysprop.fs : ssd, rack:rack1}," +
"node3:{cores:7, freedisk: 262, heap:7834, rack:rack4}," +
"node4:{cores:0, freedisk: 900, heap:16900, nodeRole:overseer, rack:rack2}" +
"}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
@Override
public Collection<String> getNodes() {
return Arrays.asList("node1", "node2", "node3", "node4");
}
};
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3);
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("node2", "node1", "node3")));
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("node2", "node1", "node3")));
}
}