mirror of
https://github.com/apache/lucene.git
synced 2025-02-24 11:16:35 +00:00
SOLR-9743: A new UTILIZENODE command
This commit is contained in:
parent
4ebac27d13
commit
2bde0eab3f
@ -95,6 +95,8 @@ New Features
|
||||
in the same Alias defined set based on a time field (currently an internal feature).
|
||||
(David Smiley)
|
||||
|
||||
* SOLR-9743: A new UTILIZENODE command (noble)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
@ -214,6 +214,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
||||
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
|
||||
.put(ADDREPLICA, new AddReplicaCmd(this))
|
||||
.put(MOVEREPLICA, new MoveReplicaCmd(this))
|
||||
.put(UTILIZENODE, new UtilizeNodeCmd(this))
|
||||
.build()
|
||||
;
|
||||
}
|
||||
|
121
solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
Normal file
121
solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
Normal file
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.params.AutoScalingParams.NODE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public UtilizeNodeCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ocmh.checkRequired(message, NODE);
|
||||
String nodeName = message.getStr(NODE);
|
||||
String async = message.getStr(ASYNC);
|
||||
AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
|
||||
|
||||
//first look for any violation that may use this replica
|
||||
List<ZkNodeProps> requests = new ArrayList<>();
|
||||
//first look for suggestions if any
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(autoScalingConfig, ocmh.overseer.getSolrCloudManager());
|
||||
for (Suggester.SuggestionInfo suggestionInfo : suggestions) {
|
||||
log.info("op: " + suggestionInfo.getOperation());
|
||||
String coll = null;
|
||||
List<String> pieces = StrUtils.splitSmart(suggestionInfo.getOperation().getPath(), '/');
|
||||
if (pieces.size() > 1) {
|
||||
coll = pieces.get(2);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
log.info("coll: " + coll);
|
||||
state.getCollection(coll).forEachReplica((s, r) -> {
|
||||
if (Objects.equals(r.getName(), r.getName())) {
|
||||
log.info("replica to be moved " + r);
|
||||
}
|
||||
});
|
||||
if (suggestionInfo.getOperation() instanceof V2Request) {
|
||||
String targetNode = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/targetNode");
|
||||
if (Objects.equals(targetNode, nodeName)) {
|
||||
String replica = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/replica");
|
||||
requests.add(new ZkNodeProps(COLLECTION_PROP, coll,
|
||||
CollectionParams.TARGET_NODE, targetNode,
|
||||
ASYNC, async,
|
||||
REPLICA_PROP, replica));
|
||||
}
|
||||
}
|
||||
}
|
||||
executeAll(requests);
|
||||
Policy.Session session = autoScalingConfig.getPolicy().createSession(ocmh.overseer.getSolrCloudManager());
|
||||
for (; ; ) {
|
||||
Suggester suggester = session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.TARGET_NODE, nodeName);
|
||||
session = suggester.getSession();
|
||||
SolrRequest request = suggester.getSuggestion();
|
||||
if (request == null) break;
|
||||
requests.add(new ZkNodeProps(COLLECTION_PROP, request.getParams().get(COLLECTION_PROP),
|
||||
CollectionParams.TARGET_NODE, request.getParams().get(CollectionParams.TARGET_NODE),
|
||||
REPLICA_PROP, request.getParams().get(REPLICA_PROP),
|
||||
ASYNC, request.getParams().get(ASYNC)));
|
||||
}
|
||||
|
||||
|
||||
executeAll(requests);
|
||||
}
|
||||
|
||||
private void executeAll(List<ZkNodeProps> requests) throws Exception {
|
||||
if (requests.isEmpty()) return;
|
||||
for (ZkNodeProps props : requests) {
|
||||
NamedList result = new NamedList();
|
||||
ocmh.commandMap.get(MOVEREPLICA)
|
||||
.call(ocmh.overseer.getSolrCloudManager().getClusterStateProvider().getClusterState(),
|
||||
props,
|
||||
result);
|
||||
}
|
||||
requests.clear();
|
||||
}
|
||||
|
||||
}
|
@ -65,6 +65,7 @@ import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
@ -680,6 +681,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
||||
new ZkNodeProps(all)).getClusterStatus(rsp.getValues());
|
||||
return null;
|
||||
}),
|
||||
UTILIZENODE_OP(UTILIZENODE, (req, rsp, h) -> {
|
||||
return req.getParams().required().getAll(null, AutoScalingParams.NODE);
|
||||
}),
|
||||
ADDREPLICAPROP_OP(ADDREPLICAPROP, (req, rsp, h) -> {
|
||||
Map<String, Object> map = req.getParams().required().getAll(null,
|
||||
COLLECTION_PROP,
|
||||
|
116
solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java
Normal file
116
solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java
Normal file
@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
public class TestUtilizeNode extends SolrCloudTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
JettySolrRunner overseerJetty = null;
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerJetty = jetty;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (overseerJetty == null) {
|
||||
fail("no overseer leader!");
|
||||
}
|
||||
}
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
cluster.waitForAllNodes(5000);
|
||||
int REPLICATION = 2;
|
||||
String coll = "utilizenodecoll";
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
|
||||
cloudClient.request(create);
|
||||
|
||||
JettySolrRunner runner = cluster.startJettySolrRunner();
|
||||
cluster.waitForAllNodes(30);
|
||||
|
||||
|
||||
cloudClient.request(new CollectionAdminRequest.UtilizeNode(runner.getNodeName()));
|
||||
|
||||
|
||||
assertTrue(getReplicaCount(cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(coll), runner.getNodeName()) > 0);
|
||||
|
||||
String setClusterPolicyCommand = "{" +
|
||||
" 'set-cluster-policy': [" +
|
||||
" {'port':" + runner.getLocalPort() +
|
||||
", 'replica':0}" +
|
||||
" ]" +
|
||||
"}";
|
||||
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
|
||||
cloudClient.request(req);
|
||||
NamedList<Object> response = cloudClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
JettySolrRunner runner2 = cluster.startJettySolrRunner();
|
||||
cluster.waitForAllNodes(30);
|
||||
|
||||
cloudClient.request(new CollectionAdminRequest.UtilizeNode(runner2.getNodeName()));
|
||||
assertTrue("no replica should be present in "+runner.getNodeName(),getReplicaCount(cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(coll), runner.getNodeName()) == 0);
|
||||
|
||||
assertTrue(getReplicaCount(cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(coll), runner2.getNodeName()) > 0);
|
||||
|
||||
}
|
||||
|
||||
private int getReplicaCount(DocCollection collection, String node) {
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
|
||||
collection.forEachReplica((s, replica) -> {
|
||||
if (replica.getNodeName().equals(node)) {
|
||||
count.incrementAndGet();
|
||||
}
|
||||
});
|
||||
return count.get();
|
||||
}
|
||||
|
||||
}
|
@ -440,4 +440,9 @@ public class Policy implements MapWriter {
|
||||
static int compareRows(Row r1, Row r2, Policy policy) {
|
||||
return policy.clusterPreferences.get(0).compare(r1, r2, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
/* A suggester is capable of suggesting a collection operation
|
||||
* given a particular session. Before it suggests a new operation,
|
||||
@ -147,6 +148,11 @@ public abstract class Suggester {
|
||||
ew.putIfNotNull("violation", violation);
|
||||
ew.put("operation", operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
}
|
||||
|
||||
//check if the fresh set of violations is less serious than the last set of violations
|
||||
|
@ -225,6 +225,11 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
|
||||
static class ClientSnitchCtx
|
||||
extends SnitchContext {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -598,6 +598,19 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
||||
return params;
|
||||
}
|
||||
|
||||
}
|
||||
public static class UtilizeNode extends AsyncCollectionAdminRequest {
|
||||
protected String node;
|
||||
|
||||
public UtilizeNode(String node) {
|
||||
super(CollectionAction.UTILIZENODE);
|
||||
this.node = node;
|
||||
}
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
return ((ModifiableSolrParams) super.getParams()).set("node", node);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class MoveReplica extends AsyncCollectionAdminRequest {
|
||||
|
@ -179,6 +179,10 @@ public class CollectionApiMapping {
|
||||
POST,
|
||||
CLUSTERPROP,
|
||||
"set-property",null),
|
||||
UTILIZE_NODE(CLUSTER_CMD,
|
||||
POST,
|
||||
UTILIZENODE,
|
||||
"utilize-node",null),
|
||||
|
||||
BACKUP_COLLECTION(COLLECTIONS_COMMANDS,
|
||||
POST,
|
||||
|
@ -105,6 +105,7 @@ public interface CollectionParams {
|
||||
CREATESNAPSHOT(true, LockLevel.COLLECTION),
|
||||
DELETESNAPSHOT(true, LockLevel.COLLECTION),
|
||||
LISTSNAPSHOTS(false, LockLevel.NONE),
|
||||
UTILIZENODE(false, LockLevel.NONE),
|
||||
//only for testing. it just waits for specified time
|
||||
// these are not exposed via collection API commands
|
||||
// but the overseer is aware of these tasks
|
||||
|
@ -69,6 +69,28 @@
|
||||
"name",
|
||||
"val"
|
||||
]
|
||||
},
|
||||
"utilize-node": {
|
||||
"type": "object",
|
||||
"documentation": "https://lucene.apache.org/solr/guide/collections-api.html#utilizenode",
|
||||
"description": "use a replica to reduce load",
|
||||
"properties": {
|
||||
"node": {
|
||||
"type": "string",
|
||||
"description": "The name of the node"
|
||||
},
|
||||
"async": {
|
||||
"type": [
|
||||
"string",
|
||||
"boolean",
|
||||
"null"
|
||||
],
|
||||
"description": "The value of the property. If the value is empty or null, the property is unset."
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"name"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1576,6 +1576,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
||||
assertEquals(2, violations.size());
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData(dataproviderdata));
|
||||
assertEquals(4, suggestions.size());
|
||||
for (Suggester.SuggestionInfo suggestionInfo : suggestions) {
|
||||
assertEquals(suggestionInfo.operation.getPath(), "/c/c1");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user