mirror of https://github.com/apache/lucene.git
SOLR-10496: New ComputePlanAction for autoscaling which uses the policy framework to compute cluster operations upon a trigger fire
Squashed commit of the following: commit 8a5239d000f0090867f7db00983e335e4bdc931a Author: Shalin Shekhar Mangar <shalin@apache.org> Date: Fri Jun 23 17:52:01 2017 +0530 SOLR-10496: New ComputePlanAction for autoscaling which uses the policy framework to compute cluster operations upon a trigger fire commit 70f60a23bc1e706abb41a4900c65305f1763f8f6 Author: Shalin Shekhar Mangar <shalin@apache.org> Date: Fri Jun 23 07:07:26 2017 +0530 SOLR-10496: Remove debugging info from tests. Enable recursive suggestion of operations. commit b023b011934be9ea411e148538daaa0a0b1d2052 Author: Shalin Shekhar Mangar <shalin@apache.org> Date: Fri Jun 23 00:18:51 2017 +0530 SOLR-10496: Fix test failure on nodeLost event commit b78e7ff589510315463488cd1ed79244d9d6e6ad Author: Noble Paul <noble@apache.org> Date: Fri Jun 23 00:34:01 2017 +0930 SOLR-10496: MOVEREPLICA suggester for dead node is not working commit cb665a1b35cbd1826c58f8d4ff8f20eb37bc5f8f Author: Shalin Shekhar Mangar <shalin@apache.org> Date: Thu Jun 22 18:50:55 2017 +0530 SOLR-10496: Fix ClassCastException because the SRC_NODE hint is a Set commit ebf298329360240014253daf58ab4699f3685033 Author: Shalin Shekhar Mangar <shalin@apache.org> Date: Thu Jun 22 18:46:28 2017 +0530 SOLR-10496: Initial patch for ComputePlanAction
This commit is contained in:
parent
148865f020
commit
767335da19
|
@ -164,6 +164,9 @@ New Features
|
||||||
|
|
||||||
* SOLR-10406: v2 API error messages list the URL request path as /solr/____v2/... when the original path was /v2/... (Cao Manh Dat, noble)
|
* SOLR-10406: v2 API error messages list the URL request path as /solr/____v2/... when the original path was /v2/... (Cao Manh Dat, noble)
|
||||||
|
|
||||||
|
* SOLR-10496: New ComputePlanAction for autoscaling which uses the policy framework to compute cluster
|
||||||
|
operations upon a trigger fire. (Noble Paul, shalin)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
----------------------
|
----------------------
|
||||||
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
|
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides additional context for the TriggerAction such as the trigger instance on
|
||||||
|
* which the action is being executed as well as helper methods to pass computed information along
|
||||||
|
* to the next action
|
||||||
|
*/
|
||||||
|
public class ActionContext {
|
||||||
|
|
||||||
|
private final CoreContainer coreContainer;
|
||||||
|
private final AutoScaling.Trigger source;
|
||||||
|
private final Map<String, Object> properties;
|
||||||
|
|
||||||
|
public ActionContext(CoreContainer coreContainer, AutoScaling.Trigger source, Map<String, Object> properties) {
|
||||||
|
this.coreContainer = coreContainer;
|
||||||
|
this.source = source;
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CoreContainer getCoreContainer() {
|
||||||
|
return coreContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AutoScaling.Trigger getSource() {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getProperties() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getProperty(String name) {
|
||||||
|
return properties != null ? properties.get(name) : null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -129,7 +129,7 @@ public class AutoScaling {
|
||||||
* Called before a trigger is scheduled. Any heavy object creation or initialisation should
|
* Called before a trigger is scheduled. Any heavy object creation or initialisation should
|
||||||
* be done in this method instead of the Trigger's constructor.
|
* be done in this method instead of the Trigger's constructor.
|
||||||
*/
|
*/
|
||||||
public void init();
|
void init();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TriggerFactory implements Closeable {
|
public static class TriggerFactory implements Closeable {
|
||||||
|
|
|
@ -18,12 +18,30 @@
|
||||||
package org.apache.solr.cloud.autoscaling;
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* todo nocommit
|
* todo nocommit
|
||||||
*/
|
*/
|
||||||
public class ComputePlanAction implements TriggerAction {
|
public class ComputePlanAction implements TriggerAction {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private Map<String, String> initArgs;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
@ -31,21 +49,77 @@ public class ComputePlanAction implements TriggerAction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Map<String, String> args) {
|
public void init(Map<String, String> args) {
|
||||||
|
this.initArgs = args;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return null;
|
return initArgs.get("name");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext context) {
|
||||||
return null;
|
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
|
||||||
|
CoreContainer container = context.getCoreContainer();
|
||||||
|
try {
|
||||||
|
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
|
||||||
|
.withZkHost(container.getZkController().getZkServerAddress())
|
||||||
|
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
|
||||||
|
.build()) {
|
||||||
|
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
||||||
|
Map<String, Object> autoScalingConf = Utils.getJson(zkStateReader.getZkClient(), ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
|
||||||
|
if (autoScalingConf.isEmpty()) {
|
||||||
|
log.error("Action: " + getName() + " executed but no policy is configured");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AutoScalingConfig config = new AutoScalingConfig(autoScalingConf);
|
||||||
|
Policy policy = config.getPolicy();
|
||||||
|
Policy.Session session = policy.createSession(new SolrClientDataProvider(cloudSolrClient));
|
||||||
|
Policy.Suggester suggester = getSuggester(session, event);
|
||||||
|
while (true) {
|
||||||
|
SolrRequest operation = suggester.getOperation();
|
||||||
|
if (operation == null) break;
|
||||||
|
log.info("Computed Plan: {}", operation);
|
||||||
|
Map<String, Object> props = context.getProperties();
|
||||||
|
props.compute("operations", (k, v) -> {
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) v;
|
||||||
|
if (operations == null) operations = new ArrayList<>();
|
||||||
|
operations.add(operation);
|
||||||
|
return operations;
|
||||||
|
});
|
||||||
|
session = suggester.getSession();
|
||||||
|
suggester = getSuggester(session, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
log.error("ZooKeeperException while processing event: " + event, e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Interrupted while processing event: " + event, e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("IOException while processing event: " + event, e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Unexpected exception while processing event: " + event, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event) {
|
||||||
public void process(TriggerEvent event) {
|
Policy.Suggester suggester;
|
||||||
|
switch (event.getEventType()) {
|
||||||
|
case NODEADDED:
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) event;
|
||||||
|
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
|
||||||
|
.hint(Policy.Suggester.Hint.TARGET_NODE, nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||||
|
log.debug("Created suggester with targetNode: {}", nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||||
|
break;
|
||||||
|
case NODELOST:
|
||||||
|
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) event;
|
||||||
|
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
|
||||||
|
.hint(Policy.Suggester.Hint.SRC_NODE, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||||
|
log.debug("Created suggester with srcNode: {}", nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("No support for events other than nodeAdded and nodeLost, received: " + event.getEventType());
|
||||||
|
}
|
||||||
|
return suggester;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,12 +40,7 @@ public class ExecutePlanAction implements TriggerAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,12 +40,7 @@ public class LogPlanAction implements TriggerAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
||||||
this.zkController = zkController;
|
this.zkController = zkController;
|
||||||
zkStateReader = zkController.getZkStateReader();
|
zkStateReader = zkController.getZkStateReader();
|
||||||
zkClient = zkController.getZkClient();
|
zkClient = zkController.getZkClient();
|
||||||
scheduledTriggers = new ScheduledTriggers(zkClient);
|
scheduledTriggers = new ScheduledTriggers(zkController);
|
||||||
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
|
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -37,10 +38,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.apache.solr.cloud.ActionThrottle;
|
import org.apache.solr.cloud.ActionThrottle;
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
|
import org.apache.solr.cloud.ZkController;
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
import org.apache.solr.common.util.IOUtils;
|
import org.apache.solr.common.util.IOUtils;
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.Op;
|
import org.apache.zookeeper.Op;
|
||||||
|
@ -80,7 +83,9 @@ public class ScheduledTriggers implements Closeable {
|
||||||
|
|
||||||
private final Overseer.Stats queueStats;
|
private final Overseer.Stats queueStats;
|
||||||
|
|
||||||
public ScheduledTriggers(SolrZkClient zkClient) {
|
private final CoreContainer coreContainer;
|
||||||
|
|
||||||
|
public ScheduledTriggers(ZkController zkController) {
|
||||||
// todo make the core pool size configurable
|
// todo make the core pool size configurable
|
||||||
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
|
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
|
||||||
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
|
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
|
||||||
|
@ -93,7 +98,8 @@ public class ScheduledTriggers implements Closeable {
|
||||||
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
||||||
// todo make the wait time configurable
|
// todo make the wait time configurable
|
||||||
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
|
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
|
||||||
this.zkClient = zkClient;
|
this.coreContainer = zkController.getCoreContainer();
|
||||||
|
this.zkClient = zkController.getZkClient();
|
||||||
queueStats = new Overseer.Stats();
|
queueStats = new Overseer.Stats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,9 +156,10 @@ public class ScheduledTriggers implements Closeable {
|
||||||
// let the action executor thread wait instead of the trigger thread so we use the throttle here
|
// let the action executor thread wait instead of the trigger thread so we use the throttle here
|
||||||
actionThrottle.minimumWaitBetweenActions();
|
actionThrottle.minimumWaitBetweenActions();
|
||||||
actionThrottle.markAttemptingAction();
|
actionThrottle.markAttemptingAction();
|
||||||
|
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
|
||||||
for (TriggerAction action : actions) {
|
for (TriggerAction action : actions) {
|
||||||
try {
|
try {
|
||||||
action.process(event);
|
action.process(event, actionContext);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
||||||
throw e;
|
throw e;
|
||||||
|
|
|
@ -26,9 +26,7 @@ import org.apache.solr.util.plugin.MapInitializedPlugin;
|
||||||
*/
|
*/
|
||||||
public interface TriggerAction extends MapInitializedPlugin, Closeable {
|
public interface TriggerAction extends MapInitializedPlugin, Closeable {
|
||||||
// todo nocommit
|
// todo nocommit
|
||||||
public String getName();
|
String getName();
|
||||||
|
|
||||||
public String getClassName();
|
void process(TriggerEvent event, ActionContext context);
|
||||||
|
|
||||||
public void process(TriggerEvent event);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,231 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for {@link ComputePlanAction}
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||||
|
public class ComputePlanActionTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static final AtomicBoolean fired = new AtomicBoolean(false);
|
||||||
|
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
private static final AtomicReference<Object> eventContextRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
private String path = null;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(1)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
fired.set(false);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
eventContextRef.set(null);
|
||||||
|
this.path = "/admin/autoscaling";
|
||||||
|
|
||||||
|
// remove everything from autoscaling.json in ZK
|
||||||
|
zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(Charsets.UTF_8), true);
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setClusterPolicyCommand = "{" +
|
||||||
|
" 'set-cluster-policy': [" +
|
||||||
|
" {'cores':'<10', 'node':'#ANY'}," +
|
||||||
|
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
|
||||||
|
" {'nodeRole':'overseer', 'replica':0}" +
|
||||||
|
" ]" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setClusterPolicyCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
String setClusterPreferencesCommand = "{" +
|
||||||
|
"'set-cluster-preferences': [" +
|
||||||
|
"{'minimize': 'cores','precision': 3}," +
|
||||||
|
"{'maximize': 'freedisk','precision': 100}]" +
|
||||||
|
"}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setClusterPreferencesCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeLost() throws Exception {
|
||||||
|
// let's start a node so that we have at least two
|
||||||
|
JettySolrRunner runner = cluster.startJettySolrRunner();
|
||||||
|
String node = runner.getNodeName();
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '1s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
|
||||||
|
"{'name':'test','class':'" + ComputePlanActionTest.AssertingTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testNodeLost",
|
||||||
|
1, 2);
|
||||||
|
create.setMaxShardsPerNode(1);
|
||||||
|
create.process(solrClient);
|
||||||
|
|
||||||
|
waitForState("Timed out waiting for replicas of new collection to be active",
|
||||||
|
"testNodeLost", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
|
||||||
|
|
||||||
|
ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
|
||||||
|
DocCollection collection = clusterState.getCollection("testNodeLost");
|
||||||
|
List<Replica> replicas = collection.getReplicas(node);
|
||||||
|
assertNotNull(replicas);
|
||||||
|
assertFalse(replicas.isEmpty());
|
||||||
|
|
||||||
|
// start another node because because when the other node goes away, the cluster policy requires only
|
||||||
|
// 1 replica per node and none on the overseer
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
cluster.waitForAllNodes(30);
|
||||||
|
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jettySolrRunner = cluster.getJettySolrRunners().get(i);
|
||||||
|
if (jettySolrRunner == runner) {
|
||||||
|
cluster.stopJettySolrRunner(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cluster.waitForAllNodes(30);
|
||||||
|
|
||||||
|
assertTrue("Trigger was not fired even after 5 seconds", triggerFiredLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(fired.get());
|
||||||
|
Map context = (Map) eventContextRef.get();
|
||||||
|
assertNotNull(context);
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
|
||||||
|
assertNotNull("The operations computed by ComputePlanAction should not be null", operations);
|
||||||
|
assertEquals("ComputePlanAction should have computed exactly 1 operation", 1, operations.size());
|
||||||
|
SolrRequest solrRequest = operations.get(0);
|
||||||
|
SolrParams params = solrRequest.getParams();
|
||||||
|
assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
|
||||||
|
String replicaToBeMoved = params.get("replica");
|
||||||
|
assertEquals("Unexpected node in computed operation", replicas.get(0).getName(), replicaToBeMoved);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeAdded() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '1s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
|
||||||
|
"{'name':'test','class':'" + ComputePlanActionTest.AssertingTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testNodeAdded",
|
||||||
|
1, 2);
|
||||||
|
create.setMaxShardsPerNode(2);
|
||||||
|
create.process(solrClient);
|
||||||
|
|
||||||
|
waitForState("Timed out waiting for replicas of new collection to be active",
|
||||||
|
"testNodeAdded", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
|
||||||
|
|
||||||
|
JettySolrRunner runner = cluster.startJettySolrRunner();
|
||||||
|
assertTrue("Trigger was not fired even after 5 seconds", triggerFiredLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(fired.get());
|
||||||
|
Map context = (Map) eventContextRef.get();
|
||||||
|
assertNotNull(context);
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
|
||||||
|
assertNotNull("The operations computed by ComputePlanAction should not be null", operations);
|
||||||
|
assertEquals("ComputePlanAction should have computed exactly 1 operation", 1, operations.size());
|
||||||
|
SolrRequest request = operations.get(0);
|
||||||
|
SolrParams params = request.getParams();
|
||||||
|
assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
|
||||||
|
String nodeAdded = params.get("targetNode");
|
||||||
|
assertEquals("Unexpected node in computed operation", runner.getNodeName(), nodeAdded);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AssertingTriggerAction implements TriggerAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) {
|
||||||
|
if (fired.compareAndSet(false, true)) {
|
||||||
|
eventContextRef.set(context.getProperties());
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -176,12 +176,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return getClass().getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -190,12 +190,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return getClass().getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -252,7 +252,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TriggerEvent event) {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
boolean locked = lock.tryLock();
|
boolean locked = lock.tryLock();
|
||||||
if (!locked) {
|
if (!locked) {
|
||||||
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||||
|
@ -584,12 +584,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return this.getClass().getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
try {
|
try {
|
||||||
if (triggerFired.compareAndSet(false, true)) {
|
if (triggerFired.compareAndSet(false, true)) {
|
||||||
events.add(event);
|
events.add(event);
|
||||||
|
@ -630,12 +625,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return this.getClass().getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
log.info("-- event: " + event);
|
log.info("-- event: " + event);
|
||||||
events.add(event);
|
events.add(event);
|
||||||
getActionStarted().countDown();
|
getActionStarted().countDown();
|
||||||
|
@ -816,12 +806,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getClassName() {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
return this.getClass().getName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(TriggerEvent event) {
|
|
||||||
boolean locked = lock.tryLock();
|
boolean locked = lock.tryLock();
|
||||||
if (!locked) {
|
if (!locked) {
|
||||||
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class SolrClientDataProvider implements ClusterDataProvider, MapWriter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||||
return data.getOrDefault(node, Collections.emptyMap());//todo fill other details
|
return data.computeIfAbsent(node, s -> Collections.emptyMap());//todo fill other details
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -42,6 +42,7 @@ class AddReplicaSuggester extends Suggester {
|
||||||
Integer targetNodeIndex = null;
|
Integer targetNodeIndex = null;
|
||||||
for (int i = getMatrix().size() - 1; i >= 0; i--) {
|
for (int i = getMatrix().size() - 1; i >= 0; i--) {
|
||||||
Row row = getMatrix().get(i);
|
Row row = getMatrix().get(i);
|
||||||
|
if(!row.isLive) continue;
|
||||||
if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
|
if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
|
||||||
Row tmpRow = row.addReplica(coll, shard);
|
Row tmpRow = row.addReplica(coll, shard);
|
||||||
tmpRow.violations.clear();
|
tmpRow.violations.clear();
|
||||||
|
|
|
@ -57,6 +57,7 @@ public class MoveReplicaSuggester extends Suggester {
|
||||||
final int i = getMatrix().indexOf(fromRow);
|
final int i = getMatrix().indexOf(fromRow);
|
||||||
for (int j = getMatrix().size() - 1; j > i; j--) {
|
for (int j = getMatrix().size() - 1; j > i; j--) {
|
||||||
Row targetRow = getMatrix().get(j);
|
Row targetRow = getMatrix().get(j);
|
||||||
|
if(!targetRow.isLive) continue;
|
||||||
if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue;
|
if (!isAllowed(targetRow.node, Hint.TARGET_NODE)) continue;
|
||||||
targetRow = targetRow.addReplica(coll, shard);
|
targetRow = targetRow.addReplica(coll, shard);
|
||||||
targetRow.violations.clear();
|
targetRow.violations.clear();
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.function.Predicate;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -378,6 +379,14 @@ public class Policy implements MapWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
|
||||||
|
if (srcNodes != null && !srcNodes.isEmpty()) {
|
||||||
|
// the source node is dead so live nodes may not have it
|
||||||
|
for (String srcNode : srcNodes) {
|
||||||
|
if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
|
||||||
|
session.matrix.add(new Row(srcNode, session.getPolicy().params, session.dataProvider));
|
||||||
|
}
|
||||||
|
}
|
||||||
session.applyRules();
|
session.applyRules();
|
||||||
originalViolations.addAll(session.getViolations());
|
originalViolations.addAll(session.getViolations());
|
||||||
this.operation = init();
|
this.operation = init();
|
||||||
|
|
|
@ -53,6 +53,9 @@ class Preference implements MapWriter {
|
||||||
// recursive, it uses the precision to tie & when there is a tie use the next preference to compare
|
// recursive, it uses the precision to tie & when there is a tie use the next preference to compare
|
||||||
// in non-recursive mode, precision is not taken into consideration and sort is done on actual value
|
// in non-recursive mode, precision is not taken into consideration and sort is done on actual value
|
||||||
int compare(Row r1, Row r2, boolean useApprox) {
|
int compare(Row r1, Row r2, boolean useApprox) {
|
||||||
|
if (!r1.isLive && !r2.isLive) return 0;
|
||||||
|
if (!r1.isLive) return -1;
|
||||||
|
if (!r2.isLive) return 1;
|
||||||
Object o1 = useApprox ? r1.cells[idx].approxVal : r1.cells[idx].val;
|
Object o1 = useApprox ? r1.cells[idx].approxVal : r1.cells[idx].val;
|
||||||
Object o2 = useApprox ? r2.cells[idx].approxVal : r2.cells[idx].val;
|
Object o2 = useApprox ? r2.cells[idx].approxVal : r2.cells[idx].val;
|
||||||
int result = 0;
|
int result = 0;
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -39,13 +40,15 @@ class Row implements MapWriter {
|
||||||
Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
|
Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
|
||||||
List<Clause> violations = new ArrayList<>();
|
List<Clause> violations = new ArrayList<>();
|
||||||
boolean anyValueMissing = false;
|
boolean anyValueMissing = false;
|
||||||
|
boolean isLive = true;
|
||||||
|
|
||||||
Row(String node, List<String> params, ClusterDataProvider dataProvider) {
|
Row(String node, List<String> params, ClusterDataProvider dataProvider) {
|
||||||
collectionVsShardVsReplicas = dataProvider.getReplicaInfo(node, params);
|
collectionVsShardVsReplicas = dataProvider.getReplicaInfo(node, params);
|
||||||
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
|
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
|
||||||
this.node = node;
|
this.node = node;
|
||||||
cells = new Cell[params.size()];
|
cells = new Cell[params.size()];
|
||||||
Map<String, Object> vals = dataProvider.getNodeValues(node, params);
|
isLive = dataProvider.getNodes().contains(node);
|
||||||
|
Map<String, Object> vals = isLive ? dataProvider.getNodeValues(node, params) : Collections.emptyMap();
|
||||||
for (int i = 0; i < params.size(); i++) {
|
for (int i = 0; i < params.size(); i++) {
|
||||||
String s = params.get(i);
|
String s = params.get(i);
|
||||||
cells[i] = new Cell(i, s, Clause.validate(s,vals.get(s), false));
|
cells[i] = new Cell(i, s, Clause.validate(s,vals.get(s), false));
|
||||||
|
@ -54,8 +57,10 @@ class Row implements MapWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, List<Clause> violations) {
|
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String,
|
||||||
|
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, List<Clause> violations, boolean isLive) {
|
||||||
this.node = node;
|
this.node = node;
|
||||||
|
this.isLive = isLive;
|
||||||
this.cells = new Cell[cells.length];
|
this.cells = new Cell[cells.length];
|
||||||
for (int i = 0; i < this.cells.length; i++) {
|
for (int i = 0; i < this.cells.length; i++) {
|
||||||
this.cells[i] = cells[i].copy();
|
this.cells[i] = cells[i].copy();
|
||||||
|
@ -75,7 +80,7 @@ class Row implements MapWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
Row copy() {
|
Row copy() {
|
||||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), new ArrayList<>(violations));
|
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), new ArrayList<>(violations), isLive);
|
||||||
}
|
}
|
||||||
|
|
||||||
Object getVal(String name) {
|
Object getVal(String name) {
|
||||||
|
@ -95,7 +100,9 @@ class Row implements MapWriter {
|
||||||
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
|
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
|
||||||
replicas.add(new ReplicaInfo("" + new Random().nextInt(1000) + 1000, coll, shard, new HashMap<>()));
|
replicas.add(new ReplicaInfo("" + new Random().nextInt(1000) + 1000, coll, shard, new HashMap<>()));
|
||||||
for (Cell cell : row.cells) {
|
for (Cell cell : row.cells) {
|
||||||
if (cell.name.equals("cores")) cell.val = ((Number) cell.val).longValue() + 1;
|
if (cell.name.equals("cores")) {
|
||||||
|
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return row;
|
return row;
|
||||||
|
|
||||||
|
@ -108,7 +115,9 @@ class Row implements MapWriter {
|
||||||
List<ReplicaInfo> s = c.get(shard);
|
List<ReplicaInfo> s = c.get(shard);
|
||||||
if (s == null || s.isEmpty()) return null;
|
if (s == null || s.isEmpty()) return null;
|
||||||
for (Cell cell : row.cells) {
|
for (Cell cell : row.cells) {
|
||||||
if (cell.name.equals("cores")) cell.val = ((Number) cell.val).longValue() -1;
|
if (cell.name.equals("cores")) {
|
||||||
|
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return new Pair(row, s.remove(0));
|
return new Pair(row, s.remove(0));
|
||||||
|
|
||||||
|
|
|
@ -188,9 +188,124 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
||||||
assertFalse(c.tag.isPass("12.6"));
|
assertFalse(c.tag.isPass("12.6"));
|
||||||
assertFalse(c.tag.isPass(12.6d));
|
assertFalse(c.tag.isPass(12.6d));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNodeLost() {
|
||||||
|
String dataproviderdata = " {'liveNodes':[" +
|
||||||
|
" '127.0.0.1:65417_solr'," +
|
||||||
|
" '127.0.0.1:65434_solr']," +
|
||||||
|
" 'replicaInfo':{" +
|
||||||
|
" '127.0.0.1:65427_solr':{'testNodeLost':{'shard1':[{'core_node2':{}}]}}," +
|
||||||
|
" '127.0.0.1:65417_solr':{'testNodeLost':{'shard1':[{'core_node1':{}}]}}," +
|
||||||
|
" '127.0.0.1:65434_solr':{}}," +
|
||||||
|
" 'nodeValues':{" +
|
||||||
|
" '127.0.0.1:65417_solr':{" +
|
||||||
|
" 'node':'127.0.0.1:65417_solr'," +
|
||||||
|
" 'cores':1," +
|
||||||
|
" 'freedisk':884.7097854614258}," +
|
||||||
|
" '127.0.0.1:65434_solr':{" +
|
||||||
|
" 'node':'127.0.0.1:65434_solr'," +
|
||||||
|
" 'cores':0," +
|
||||||
|
" 'freedisk':884.7097854614258}}}";
|
||||||
|
/* String stateJson = "{'testNodeLost':{" +
|
||||||
|
" 'pullReplicas':'0'," +
|
||||||
|
" 'replicationFactor':'2'," +
|
||||||
|
" 'router':{'name':'compositeId'}," +
|
||||||
|
" 'maxShardsPerNode':'1'," +
|
||||||
|
" 'autoAddReplicas':'false'," +
|
||||||
|
" 'nrtReplicas':'2'," +
|
||||||
|
" 'tlogReplicas':'0'," +
|
||||||
|
" 'shards':{'shard1':{" +
|
||||||
|
" 'range':'80000000-7fffffff'," +
|
||||||
|
" 'state':'active'," +
|
||||||
|
" 'replicas':{" +
|
||||||
|
" 'core_node1':{" +
|
||||||
|
" 'core':'testNodeLost_shard1_replica_n1'," +
|
||||||
|
" 'base_url':'http://127.0.0.1:65417/solr'," +
|
||||||
|
" 'node_name':'127.0.0.1:65417_solr'," +
|
||||||
|
" 'state':'active'," +
|
||||||
|
" 'type':'NRT'," +
|
||||||
|
" 'leader':'true'}," +
|
||||||
|
" 'core_node2':{" +
|
||||||
|
" 'core':'testNodeLost_shard1_replica_n2'," +
|
||||||
|
" 'base_url':'http://127.0.0.1:65427/solr'," +
|
||||||
|
" 'node_name':'127.0.0.1:65427_solr'," +
|
||||||
|
" 'state':'down'," +
|
||||||
|
" 'type':'NRT'}}}}}}";*/
|
||||||
|
|
||||||
|
String autoScalingjson = "{" +
|
||||||
|
" 'cluster-policy':[" +
|
||||||
|
" {" +
|
||||||
|
" 'cores':'<10'," +
|
||||||
|
" 'node':'#ANY'}," +
|
||||||
|
" {" +
|
||||||
|
" 'replica':'<2'," +
|
||||||
|
" 'shard':'#EACH'," +
|
||||||
|
" 'node':'#ANY'}," +
|
||||||
|
" {" +
|
||||||
|
" 'nodeRole':'overseer'," +
|
||||||
|
" 'replica':0}]," +
|
||||||
|
" 'cluster-preferences':[" +
|
||||||
|
" {" +
|
||||||
|
" 'minimize':'cores'," +
|
||||||
|
" 'precision':3}," +
|
||||||
|
" {" +
|
||||||
|
" 'maximize':'freedisk'," +
|
||||||
|
" 'precision':100}]}";
|
||||||
|
|
||||||
|
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||||
|
Policy.Session session = policy.createSession(dataProviderWithData(dataproviderdata));
|
||||||
|
SolrRequest op = session.getSuggester(MOVEREPLICA).hint(Hint.SRC_NODE, "127.0.0.1:65427_solr").getOperation();
|
||||||
|
assertNotNull(op);
|
||||||
|
assertEquals( "127.0.0.1:65434_solr",op.getParams().get("targetNode") );
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ClusterDataProvider dataProviderWithData(String data){
|
||||||
|
final Map m = (Map) Utils.fromJSONString(data);
|
||||||
|
Map replicaInfo = (Map) m.get("replicaInfo");
|
||||||
|
replicaInfo.forEach((node, val) -> {
|
||||||
|
Map m1 = (Map) val;
|
||||||
|
m1.forEach((coll, val2) -> {
|
||||||
|
Map m2 = (Map) val2;
|
||||||
|
m2.forEach((shard, val3) -> {
|
||||||
|
List l3 = (List) val3;
|
||||||
|
for (int i = 0; i < l3.size(); i++) {
|
||||||
|
Object o = l3.get(i);
|
||||||
|
Map m3 = (Map) o;
|
||||||
|
l3.set(i, new Policy.ReplicaInfo(m3.keySet().iterator().next().toString()
|
||||||
|
,coll.toString(), shard.toString(),new HashMap<>()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
||||||
|
return new ClusterDataProvider(){
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||||
|
return (Map<String, Object>) Utils.getObjectByPath(m,false, Arrays.asList("nodeValues", node));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||||
|
return (Map<String, Map<String, List<Policy.ReplicaInfo>>>) Utils.getObjectByPath(m,false, Arrays.asList("replicaInfo", node));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<String> getNodes() {
|
||||||
|
return (Collection<String>) m.get("liveNodes");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPolicyNameByCollection(String coll) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void testRow() {
|
public void testRow() {
|
||||||
Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), new ArrayList<>());
|
Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), new ArrayList<>(), true);
|
||||||
Row r1 = row.addReplica("c1", "s1");
|
Row r1 = row.addReplica("c1", "s1");
|
||||||
Row r2 = r1.addReplica("c1", "s1");
|
Row r2 = r1.addReplica("c1", "s1");
|
||||||
assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size());
|
assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size());
|
||||||
|
|
Loading…
Reference in New Issue