mirror of https://github.com/apache/lucene.git
SOLR-11076 New /autoscaling/history API to return past cluster events and actions.
This commit is contained in:
parent
f10993d26d
commit
98c371f85e
|
@ -22,9 +22,11 @@ import java.io.StringWriter;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
@ -56,6 +58,14 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
public static final String SOURCE_FIELD = "source_s";
|
public static final String SOURCE_FIELD = "source_s";
|
||||||
|
public static final String EVENT_SOURCE_FIELD = "event.source_s";
|
||||||
|
public static final String EVENT_TYPE_FIELD = "event.type_s";
|
||||||
|
public static final String STAGE_FIELD = "stage_s";
|
||||||
|
public static final String ACTION_FIELD = "action_s";
|
||||||
|
public static final String MESSAGE_FIELD = "message_t";
|
||||||
|
public static final String BEFORE_ACTIONS_FIELD = "before.actions_ss";
|
||||||
|
public static final String AFTER_ACTIONS_FIELD = "after.actions_ss";
|
||||||
|
public static final String COLLECTIONS_FIELD = "collections_ss";
|
||||||
public static final String SOURCE = SystemLogListener.class.getSimpleName();
|
public static final String SOURCE = SystemLogListener.class.getSimpleName();
|
||||||
public static final String DOC_TYPE = "autoscaling_event";
|
public static final String DOC_TYPE = "autoscaling_event";
|
||||||
|
|
||||||
|
@ -84,17 +94,17 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
doc.addField(SOURCE_FIELD, SOURCE);
|
doc.addField(SOURCE_FIELD, SOURCE);
|
||||||
doc.addField("id", IdUtils.timeRandomId());
|
doc.addField("id", IdUtils.timeRandomId());
|
||||||
doc.addField("event.id_s", event.getId());
|
doc.addField("event.id_s", event.getId());
|
||||||
doc.addField("event.type_s", event.getEventType().toString());
|
doc.addField(EVENT_TYPE_FIELD, event.getEventType().toString());
|
||||||
doc.addField("event.source_s", event.getSource());
|
doc.addField(EVENT_SOURCE_FIELD, event.getSource());
|
||||||
doc.addField("event.time_l", event.getEventTime());
|
doc.addField("event.time_l", event.getEventTime());
|
||||||
doc.addField("timestamp", new Date());
|
doc.addField("timestamp", new Date());
|
||||||
addMap("event.property.", doc, event.getProperties());
|
addMap("event.property.", doc, event.getProperties());
|
||||||
doc.addField("stage_s", stage.toString());
|
doc.addField(STAGE_FIELD, stage.toString());
|
||||||
if (actionName != null) {
|
if (actionName != null) {
|
||||||
doc.addField("action_s", actionName);
|
doc.addField(ACTION_FIELD, actionName);
|
||||||
}
|
}
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
doc.addField("message_t", message);
|
doc.addField(MESSAGE_FIELD, message);
|
||||||
}
|
}
|
||||||
addError(doc, error);
|
addError(doc, error);
|
||||||
// add JSON versions of event and context
|
// add JSON versions of event and context
|
||||||
|
@ -105,8 +115,8 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
addOperations(doc, (List<SolrRequest>)context.getProperties().get("operations"));
|
addOperations(doc, (List<SolrRequest>)context.getProperties().get("operations"));
|
||||||
// capture specifics of responses after execute_plan action
|
// capture specifics of responses after execute_plan action
|
||||||
addResponses(doc, (List<NamedList<Object>>)context.getProperties().get("responses"));
|
addResponses(doc, (List<NamedList<Object>>)context.getProperties().get("responses"));
|
||||||
addActions("before", doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.BEFORE_ACTION.toString()));
|
addActions(BEFORE_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.BEFORE_ACTION.toString()));
|
||||||
addActions("after", doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.AFTER_ACTION.toString()));
|
addActions(AFTER_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.AFTER_ACTION.toString()));
|
||||||
String contextJson = Utils.toJSONString(context);
|
String contextJson = Utils.toJSONString(context);
|
||||||
doc.addField("context_str", contextJson);
|
doc.addField("context_str", contextJson);
|
||||||
}
|
}
|
||||||
|
@ -124,11 +134,11 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addActions(String prefix, SolrInputDocument doc, List<String> actions) {
|
private void addActions(String field, SolrInputDocument doc, List<String> actions) {
|
||||||
if (actions == null) {
|
if (actions == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
actions.forEach(a -> doc.addField(prefix + ".actions_ss", a));
|
actions.forEach(a -> doc.addField(field, a));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addMap(String prefix, SolrInputDocument doc, Map<String, Object> map) {
|
private void addMap(String prefix, SolrInputDocument doc, Map<String, Object> map) {
|
||||||
|
@ -147,11 +157,15 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
if (operations == null || operations.isEmpty()) {
|
if (operations == null || operations.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Set<String> collections = new HashSet<>();
|
||||||
for (SolrRequest req : operations) {
|
for (SolrRequest req : operations) {
|
||||||
SolrParams params = req.getParams();
|
SolrParams params = req.getParams();
|
||||||
if (params == null) {
|
if (params == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (params.get(CollectionAdminParams.COLLECTION) != null) {
|
||||||
|
collections.add(params.get(CollectionAdminParams.COLLECTION));
|
||||||
|
}
|
||||||
// build a whitespace-separated param string
|
// build a whitespace-separated param string
|
||||||
StringJoiner paramJoiner = new StringJoiner(" ");
|
StringJoiner paramJoiner = new StringJoiner(" ");
|
||||||
paramJoiner.setEmptyValue("");
|
paramJoiner.setEmptyValue("");
|
||||||
|
@ -167,6 +181,9 @@ public class SystemLogListener extends TriggerListenerBase {
|
||||||
doc.addField("operations.params_ts", paramString);
|
doc.addField("operations.params_ts", paramString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!collections.isEmpty()) {
|
||||||
|
doc.addField(COLLECTIONS_FIELD, collections);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addResponses(SolrInputDocument doc, List<NamedList<Object>> responses) {
|
private void addResponses(SolrInputDocument doc, List<NamedList<Object>> responses) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.core;
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
|
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
|
||||||
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
|
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
|
||||||
|
import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
|
||||||
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
|
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
|
||||||
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
|
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
|
||||||
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
|
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
|
||||||
|
@ -78,6 +79,7 @@ import org.apache.solr.core.backup.repository.BackupRepository;
|
||||||
import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
|
import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
|
||||||
import org.apache.solr.handler.RequestHandlerBase;
|
import org.apache.solr.handler.RequestHandlerBase;
|
||||||
import org.apache.solr.handler.SnapShooter;
|
import org.apache.solr.handler.SnapShooter;
|
||||||
|
import org.apache.solr.handler.admin.AutoscalingHistoryHandler;
|
||||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||||
import org.apache.solr.handler.admin.ConfigSetsHandler;
|
import org.apache.solr.handler.admin.ConfigSetsHandler;
|
||||||
import org.apache.solr.handler.admin.CoreAdminHandler;
|
import org.apache.solr.handler.admin.CoreAdminHandler;
|
||||||
|
@ -189,6 +191,8 @@ public class CoreContainer {
|
||||||
|
|
||||||
protected MetricsCollectorHandler metricsCollectorHandler;
|
protected MetricsCollectorHandler metricsCollectorHandler;
|
||||||
|
|
||||||
|
protected AutoscalingHistoryHandler autoscalingHistoryHandler;
|
||||||
|
|
||||||
|
|
||||||
// Bits for the state variable.
|
// Bits for the state variable.
|
||||||
public final static long LOAD_COMPLETE = 0x1L;
|
public final static long LOAD_COMPLETE = 0x1L;
|
||||||
|
@ -532,6 +536,7 @@ public class CoreContainer {
|
||||||
coreAdminHandler = createHandler(CORES_HANDLER_PATH, cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
|
coreAdminHandler = createHandler(CORES_HANDLER_PATH, cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
|
||||||
configSetsHandler = createHandler(CONFIGSETS_HANDLER_PATH, cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
|
configSetsHandler = createHandler(CONFIGSETS_HANDLER_PATH, cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
|
||||||
metricsHandler = createHandler(METRICS_PATH, MetricsHandler.class.getName(), MetricsHandler.class);
|
metricsHandler = createHandler(METRICS_PATH, MetricsHandler.class.getName(), MetricsHandler.class);
|
||||||
|
autoscalingHistoryHandler = createHandler(AUTOSCALING_HISTORY_PATH, AutoscalingHistoryHandler.class.getName(), AutoscalingHistoryHandler.class);
|
||||||
metricsCollectorHandler = createHandler(MetricsCollectorHandler.HANDLER_PATH, MetricsCollectorHandler.class.getName(), MetricsCollectorHandler.class);
|
metricsCollectorHandler = createHandler(MetricsCollectorHandler.HANDLER_PATH, MetricsCollectorHandler.class.getName(), MetricsCollectorHandler.class);
|
||||||
// may want to add some configuration here in the future
|
// may want to add some configuration here in the future
|
||||||
metricsCollectorHandler.init(null);
|
metricsCollectorHandler.init(null);
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
package org.apache.solr.handler.admin;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.cloud.autoscaling.SystemLogListener;
|
||||||
|
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.params.AutoScalingParams;
|
||||||
|
import org.apache.solr.common.params.CollectionAdminParams;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
import org.apache.solr.handler.RequestHandlerBase;
|
||||||
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
|
import org.apache.solr.security.AuthorizationContext;
|
||||||
|
import org.apache.solr.security.PermissionNameProvider;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This handler makes it easier to retrieve a history of autoscaling events from the .system
|
||||||
|
* collection.
|
||||||
|
*/
|
||||||
|
public class AutoscalingHistoryHandler extends RequestHandlerBase implements PermissionNameProvider {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
public static final String SYSTEM_COLLECTION_PARAM = "systemCollection";
|
||||||
|
|
||||||
|
public static final String ACTION_PARAM = "action";
|
||||||
|
public static final String MESSAGE_PARAM = "message";
|
||||||
|
public static final String TRIGGER_PARAM = AutoScalingParams.TRIGGER;
|
||||||
|
public static final String TYPE_PARAM = "eventType";
|
||||||
|
public static final String NODE_PARAM = "node";
|
||||||
|
public static final String COLLECTION_PARAM = CollectionAdminParams.COLLECTION;
|
||||||
|
public static final String STAGE_PARAM = AutoScalingParams.STAGE;
|
||||||
|
public static final String BEFORE_ACTION_PARAM = AutoScalingParams.BEFORE_ACTION;
|
||||||
|
public static final String AFTER_ACTION_PARAM = AutoScalingParams.AFTER_ACTION;
|
||||||
|
|
||||||
|
private static final String EVENTS_FQ = "{!term f=" + CommonParams.TYPE + "}" + SystemLogListener.DOC_TYPE;
|
||||||
|
|
||||||
|
private static final String ACTION_FQ_FORMAT = "{!term f=" + SystemLogListener.ACTION_FIELD + "}%s";
|
||||||
|
private static final String MESSAGE_FQ_FORMAT = "{!lucene}" + SystemLogListener.MESSAGE_FIELD + ":%s";
|
||||||
|
private static final String TRIGGER_FQ_FORMAT = "{!term f=" + SystemLogListener.EVENT_SOURCE_FIELD + "}%s";
|
||||||
|
private static final String STAGE_FQ_FORMAT = "{!term f=" + SystemLogListener.STAGE_FIELD + "}%s";
|
||||||
|
private static final String COLLECTION_FQ_FORMAT = "{!term f=" + SystemLogListener.COLLECTIONS_FIELD + "}%s";
|
||||||
|
private static final String TYPE_FQ_FORMAT = "{!term f=" + SystemLogListener.EVENT_TYPE_FIELD + "}%s";
|
||||||
|
private static final String NODE_FQ_FORMAT = "{!term f=event.property." + TriggerEvent.NODE_NAME + "_s}%s";
|
||||||
|
private static final String BEFORE_ACTION_FQ_FORMAT = "{!term f=" + SystemLogListener.BEFORE_ACTIONS_FIELD + "}%s";
|
||||||
|
private static final String AFTER_ACTION_FQ_FORMAT = "{!term f=" + SystemLogListener.AFTER_ACTIONS_FIELD + "}%s";
|
||||||
|
|
||||||
|
private static final Map<String, String> formats = new HashMap<String, String>() {{
|
||||||
|
put(ACTION_PARAM, ACTION_FQ_FORMAT);
|
||||||
|
put(MESSAGE_PARAM, MESSAGE_FQ_FORMAT);
|
||||||
|
put(TRIGGER_PARAM, TRIGGER_FQ_FORMAT);
|
||||||
|
put(TYPE_PARAM, TYPE_FQ_FORMAT);
|
||||||
|
put(STAGE_PARAM, STAGE_FQ_FORMAT);
|
||||||
|
put(NODE_PARAM, NODE_FQ_FORMAT);
|
||||||
|
put(COLLECTION_PARAM, COLLECTION_FQ_FORMAT);
|
||||||
|
put(BEFORE_ACTION_PARAM, BEFORE_ACTION_FQ_FORMAT);
|
||||||
|
put(AFTER_ACTION_PARAM, AFTER_ACTION_FQ_FORMAT);
|
||||||
|
}};
|
||||||
|
|
||||||
|
private final CoreContainer coreContainer;
|
||||||
|
|
||||||
|
|
||||||
|
public AutoscalingHistoryHandler(CoreContainer coreContainer) {
|
||||||
|
this.coreContainer = coreContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Name getPermissionName(AuthorizationContext request) {
|
||||||
|
return Name.AUTOSCALING_HISTORY_READ_PERM;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
|
||||||
|
String collection = params.get(SYSTEM_COLLECTION_PARAM, CollectionAdminParams.SYSTEM_COLL);
|
||||||
|
params.remove(SYSTEM_COLLECTION_PARAM);
|
||||||
|
params.remove(CommonParams.QT);
|
||||||
|
// check that we have the main query, if not then use *:*
|
||||||
|
if (params.get(CommonParams.Q) == null) {
|
||||||
|
params.add(CommonParams.Q, "*:*");
|
||||||
|
}
|
||||||
|
// sort by doc id, which are time-based, unless specified otherwise
|
||||||
|
if (params.get(CommonParams.SORT) == null) {
|
||||||
|
params.add(CommonParams.SORT, "id asc");
|
||||||
|
}
|
||||||
|
// filter query to pick only autoscaling events
|
||||||
|
params.remove(CommonParams.FQ, EVENTS_FQ);
|
||||||
|
params.add(CommonParams.FQ, EVENTS_FQ);
|
||||||
|
// add filters translated from simplified parameters
|
||||||
|
for (Map.Entry<String, String> e : formats.entrySet()) {
|
||||||
|
String[] values = params.remove(e.getKey());
|
||||||
|
if (values != null) {
|
||||||
|
for (String value : values) {
|
||||||
|
params.add(CommonParams.FQ, String.format(e.getValue(), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
|
||||||
|
.withZkHost(coreContainer.getZkController().getZkServerAddress())
|
||||||
|
.withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient())
|
||||||
|
.build()) {
|
||||||
|
QueryResponse qr = cloudSolrClient.query(collection, params);
|
||||||
|
rsp.getValues().add("response", qr.getResults());
|
||||||
|
} catch (Exception e) {
|
||||||
|
if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) {
|
||||||
|
// relatively benign
|
||||||
|
LOG.info("Collection " + collection + " does not exist.");
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return "A handler to return autoscaling event history";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Category getCategory() {
|
||||||
|
return Category.ADMIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -49,6 +49,7 @@ public interface PermissionNameProvider {
|
||||||
METRICS_READ_PERM("metrics-read", null),
|
METRICS_READ_PERM("metrics-read", null),
|
||||||
AUTOSCALING_READ_PERM("autoscaling-read", null),
|
AUTOSCALING_READ_PERM("autoscaling-read", null),
|
||||||
AUTOSCALING_WRITE_PERM("autoscaling-write", null),
|
AUTOSCALING_WRITE_PERM("autoscaling-write", null),
|
||||||
|
AUTOSCALING_HISTORY_READ_PERM("autoscaling-history-read", null),
|
||||||
ALL("all", unmodifiableSet(new HashSet<>(asList("*", null))))
|
ALL("all", unmodifiableSet(new HashSet<>(asList("*", null))))
|
||||||
;
|
;
|
||||||
final String name;
|
final String name;
|
||||||
|
|
|
@ -0,0 +1,275 @@
|
||||||
|
package org.apache.solr.handler.admin;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
|
import org.apache.solr.cloud.autoscaling.SystemLogListener;
|
||||||
|
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||||
|
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||||
|
import org.apache.solr.common.SolrDocument;
|
||||||
|
import org.apache.solr.common.SolrDocumentList;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.params.CollectionAdminParams;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch actionFiredLatch;
|
||||||
|
private static CloudSolrClient solrClient;
|
||||||
|
private static String PREFIX = AutoscalingHistoryHandlerTest.class.getSimpleName();
|
||||||
|
|
||||||
|
private static CountDownLatch getActionFiredLatch() {
|
||||||
|
return actionFiredLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(3)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
solrClient = cluster.getSolrClient();
|
||||||
|
CollectionAdminRequest.createCollection(PREFIX + "_collection", null, 1, 3)
|
||||||
|
.process(solrClient);
|
||||||
|
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 3)
|
||||||
|
.process(solrClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TesterAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) {
|
||||||
|
getActionFiredLatch().countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupTest() throws Exception {
|
||||||
|
actionFiredLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// first trigger
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '" + PREFIX + "_node_added_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'compute_plan','class':'solr.ComputePlanAction'}," +
|
||||||
|
"{'name':'execute_plan','class':'solr.ExecutePlanAction'}," +
|
||||||
|
"{'name':'test','class':'" + TesterAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// second trigger
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '" + PREFIX + "_node_lost_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'compute_plan','class':'solr.ComputePlanAction'}," +
|
||||||
|
"{'name':'execute_plan','class':'solr.ExecutePlanAction'}," +
|
||||||
|
"{'name':'test','class':'" + TesterAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// remove default listeners
|
||||||
|
String removeListenerCommand = "{\n" +
|
||||||
|
"\t\"remove-listener\" : {\n" +
|
||||||
|
"\t\t\"name\" : \"" + PREFIX + "_node_lost_trigger.system\"\n" +
|
||||||
|
"\t}\n" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, removeListenerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
removeListenerCommand = "{\n" +
|
||||||
|
"\t\"remove-listener\" : {\n" +
|
||||||
|
"\t\t\"name\" : \"" + PREFIX + "_node_added_trigger.system\"\n" +
|
||||||
|
"\t}\n" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, removeListenerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
// set up our own listeners
|
||||||
|
String setListenerCommand = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'node_added'," +
|
||||||
|
"'trigger' : '" + PREFIX + "_node_added_trigger'," +
|
||||||
|
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
||||||
|
"'beforeAction' : ['compute_plan','execute_plan','test']," +
|
||||||
|
"'afterAction' : ['compute_plan','execute_plan','test']," +
|
||||||
|
"'class' : '" + SystemLogListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
setListenerCommand = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'node_lost'," +
|
||||||
|
"'trigger' : '" + PREFIX + "_node_lost_trigger'," +
|
||||||
|
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
||||||
|
"'beforeAction' : ['compute_plan','execute_plan','test']," +
|
||||||
|
"'afterAction' : ['compute_plan','execute_plan','test']," +
|
||||||
|
"'class' : '" + SystemLogListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetLatch() {
|
||||||
|
actionFiredLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHistory() throws Exception {
|
||||||
|
JettySolrRunner jetty = cluster.startJettySolrRunner();
|
||||||
|
String nodeAddedName = jetty.getNodeName();
|
||||||
|
boolean await = actionFiredLatch.await(60, TimeUnit.SECONDS);
|
||||||
|
assertTrue("action did not execute", await);
|
||||||
|
// commit on the history collection
|
||||||
|
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
|
||||||
|
// verify that new docs exist
|
||||||
|
ModifiableSolrParams query = params(CommonParams.Q, "type:" + SystemLogListener.DOC_TYPE,
|
||||||
|
CommonParams.FQ, "event.source_s:" + PREFIX + "_node_added_trigger");
|
||||||
|
QueryResponse resp = solrClient.query(CollectionAdminParams.SYSTEM_COLL, query);
|
||||||
|
SolrDocumentList docs = resp.getResults();
|
||||||
|
assertNotNull(docs);
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.TRIGGER_PARAM, PREFIX + "_node_added_trigger");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(8, docs.size());
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.STAGE_PARAM, "STARTED");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(1, docs.size());
|
||||||
|
assertEquals("NODEADDED", docs.get(0).getFieldValue("event.type_s"));
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.NODE_PARAM, nodeAddedName);
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(8, docs.size());
|
||||||
|
for (SolrDocument doc : docs) {
|
||||||
|
assertEquals(nodeAddedName, doc.getFieldValue("event.property.nodeName_s"));
|
||||||
|
}
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.ACTION_PARAM, "test");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(2, docs.size());
|
||||||
|
assertEquals("BEFORE_ACTION", docs.get(0).getFieldValue("stage_s"));
|
||||||
|
assertEquals("AFTER_ACTION", docs.get(1).getFieldValue("stage_s"));
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.ACTION_PARAM, "test");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(2, docs.size());
|
||||||
|
assertEquals("BEFORE_ACTION", docs.get(0).getFieldValue("stage_s"));
|
||||||
|
assertEquals("AFTER_ACTION", docs.get(1).getFieldValue("stage_s"));
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.COLLECTION_PARAM, CollectionAdminParams.SYSTEM_COLL);
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(5, docs.size());
|
||||||
|
assertEquals("AFTER_ACTION", docs.get(0).getFieldValue("stage_s"));
|
||||||
|
assertEquals("compute_plan", docs.get(0).getFieldValue("action_s"));
|
||||||
|
|
||||||
|
// reset latch
|
||||||
|
resetLatch();
|
||||||
|
|
||||||
|
// kill a node
|
||||||
|
cluster.stopJettySolrRunner(0);
|
||||||
|
await = actionFiredLatch.await(60, TimeUnit.SECONDS);
|
||||||
|
// wait for recovery
|
||||||
|
waitForRecovery(PREFIX + "_collection");
|
||||||
|
Thread.sleep(5000);
|
||||||
|
// commit on the history collection
|
||||||
|
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.TRIGGER_PARAM, PREFIX + "_node_lost_trigger");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(8, docs.size());
|
||||||
|
|
||||||
|
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||||
|
AutoscalingHistoryHandler.TRIGGER_PARAM, PREFIX + "_node_lost_trigger",
|
||||||
|
AutoscalingHistoryHandler.COLLECTION_PARAM, PREFIX + "_collection");
|
||||||
|
docs = solrClient.query(query).getResults();
|
||||||
|
assertEquals(5, docs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForRecovery(String collection) throws Exception {
|
||||||
|
boolean recovered = false;
|
||||||
|
for (int i = 0; i < 300; i++) {
|
||||||
|
ClusterState state = solrClient.getZkStateReader().getClusterState();
|
||||||
|
DocCollection collState = getCollectionState(collection);
|
||||||
|
log.debug("###### " + collState);
|
||||||
|
Collection<Replica> replicas = collState.getReplicas();
|
||||||
|
boolean allActive = true;
|
||||||
|
boolean hasLeaders = true;
|
||||||
|
if (replicas != null && !replicas.isEmpty()) {
|
||||||
|
for (Replica r : replicas) {
|
||||||
|
if (!r.isActive(state.getLiveNodes())) {
|
||||||
|
log.info("Not active: " + r);
|
||||||
|
allActive = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
allActive = false;
|
||||||
|
}
|
||||||
|
for (Slice slice : collState.getSlices()) {
|
||||||
|
if (slice.getLeader() == null) {
|
||||||
|
hasLeaders = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (allActive && hasLeaders) {
|
||||||
|
recovered = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
log.info("--- waiting, allActive=" + allActive + ", hasLeaders=" + hasLeaders);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("replica never fully recovered", recovered);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -184,6 +184,7 @@ public interface CommonParams {
|
||||||
String ZK_PATH = "/admin/zookeeper";
|
String ZK_PATH = "/admin/zookeeper";
|
||||||
String METRICS_PATH = "/admin/metrics";
|
String METRICS_PATH = "/admin/metrics";
|
||||||
String AUTOSCALING_PATH = "/admin/autoscaling";
|
String AUTOSCALING_PATH = "/admin/autoscaling";
|
||||||
|
String AUTOSCALING_HISTORY_PATH = "/admin/autoscaling/history";
|
||||||
String AUTOSCALING_DIAGNOSTICS_PATH = "/admin/autoscaling/diagnostics";
|
String AUTOSCALING_DIAGNOSTICS_PATH = "/admin/autoscaling/diagnostics";
|
||||||
|
|
||||||
String STATUS = "status";
|
String STATUS = "status";
|
||||||
|
@ -200,6 +201,7 @@ public interface CommonParams {
|
||||||
AUTHZ_PATH,
|
AUTHZ_PATH,
|
||||||
METRICS_PATH,
|
METRICS_PATH,
|
||||||
AUTOSCALING_PATH,
|
AUTOSCALING_PATH,
|
||||||
|
AUTOSCALING_HISTORY_PATH,
|
||||||
AUTOSCALING_DIAGNOSTICS_PATH)));
|
AUTOSCALING_DIAGNOSTICS_PATH)));
|
||||||
String APISPEC_LOCATION = "apispec/";
|
String APISPEC_LOCATION = "apispec/";
|
||||||
String INTROSPECT = "/_introspect";
|
String INTROSPECT = "/_introspect";
|
||||||
|
|
Loading…
Reference in New Issue