Core: Simplify the template helper by using the actual put index template api instead of the MetaDataIndexTemplateService directly which allows for code reuse.

Original commit: elastic/x-pack-elasticsearch@1c646f3448
This commit is contained in:
Martijn van Groningen 2014-11-12 22:04:41 +01:00
parent e0741997d0
commit cd2c74a6a0
3 changed files with 36 additions and 95 deletions

View File

@ -52,17 +52,17 @@ public class AlertsStore extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField ACTION_FIELD = new ParseField("actions");
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
public static final ParseField ENABLE = new ParseField("enable");
public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary");
public static final ParseField REQUEST_FIELD = new ParseField("request");
public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary");
public static final ParseField REQUEST_FIELD = new ParseField("request");
private final Client client;
private final ThreadPool threadPool;
private final ConcurrentMap<String,Alert> alertMap;
private final ConcurrentMap<String, Alert> alertMap;
private final AlertActionRegistry alertActionRegistry;
private final TriggerManager triggerManager;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@ -198,10 +198,17 @@ public class AlertsStore extends AbstractComponent {
}
}
} else {
if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) {
if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.LOADING)) {
logger.info("No previous .alert index");
templateHelper.checkAndUploadIndexTemplate(state, "alerts");
listener.onSuccess();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
templateHelper.checkAndUploadIndexTemplate(state, "alerts");
if (AlertsStore.this.state.compareAndSet(State.LOADING, State.STARTED)) {
listener.onSuccess();
}
}
});
}
}
}

View File

@ -5,49 +5,41 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*/
public class TemplateHelper extends AbstractComponent {
private final MetaDataIndexTemplateService indexTemplateService;
private final TransportPutIndexTemplateAction transportPutIndexTemplateAction;
@Inject
public TemplateHelper(Settings settings, MetaDataIndexTemplateService indexTemplateService, TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
public TemplateHelper(Settings settings, TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
super(settings);
this.indexTemplateService = indexTemplateService;
this.transportPutIndexTemplateAction = transportPutIndexTemplateAction;
}
/**
* Checks if the template with the specified name exists and has the expected version.
* If that isn't the case then the template from the classpath will be uploaded to the cluster
* If that isn't the case then the template from the classpath will be uploaded to the cluster.
*
* In the the template doesn't exists this method blocks until the template has been created.
*/
public void checkAndUploadIndexTemplate(ClusterState state, final String templateName) {
final byte[] template;
@ -85,76 +77,11 @@ public class TemplateHelper extends AbstractComponent {
} else {
logger.info("Adding index template [{}], because none was found", templateName);
}
MetaDataIndexTemplateService.PutRequest putRequest = new MetaDataIndexTemplateService.PutRequest("alerts-template", templateName);
XContent xContent = XContentFactory.xContent(template, 0, template.length);
try (XContentParser parser = xContent.createParser(template, 0, template.length)) {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
switch (currentFieldName) {
case "settings":
XContentBuilder settingsBuilder = jsonBuilder();
settingsBuilder.copyCurrentStructure(parser);
String source = settingsBuilder.string();
putRequest.settings(ImmutableSettings.settingsBuilder().loadFromSource(source).build());
break;
case "mappings":
Map<String, String> mappingSource = new HashMap<>();
String currentMappingFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case FIELD_NAME:
currentMappingFieldName = parser.currentName();
break;
case START_OBJECT:
XContentBuilder mappingsBuilder = jsonBuilder();
mappingsBuilder.copyCurrentStructure(parser);
mappingSource.put(currentMappingFieldName, mappingsBuilder.string());
break;
}
}
putRequest.mappings(mappingSource);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unsupported token [" + token + "]");
}
break;
case VALUE_STRING:
if ("template".equals(currentFieldName)) {
putRequest.template(parser.textOrNull());
} else {
throw new ElasticsearchIllegalArgumentException("Unsupported field [" + currentFieldName + "]");
}
break;
case VALUE_NUMBER:
if ("order".equals(currentFieldName)) {
putRequest.order(parser.intValue());
} else {
throw new ElasticsearchIllegalArgumentException("Unsupported field [" + currentFieldName + "]");
}
break;
default:
throw new ElasticsearchIllegalArgumentException("Unsupported token [" + token + "]");
}
}
}
indexTemplateService.putTemplate(putRequest, new MetaDataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetaDataIndexTemplateService.PutResponse response) {
logger.info("Adding template [{}] was successful", templateName);
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to add template [{}]", t, templateName);
}
});
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
// We're already running on the master and TransportPutIndexTemplateAction#executor() is SAME, so it is ok to wait:
ActionFuture<PutIndexTemplateResponse> future = transportPutIndexTemplateAction.execute(request);
PutIndexTemplateResponse response = future.actionGet();
} catch (IOException e) {
// if we're not sure of the template, we can't send data... re-raise exception.
throw new RuntimeException("failed to load/verify index template", e);

View File

@ -123,10 +123,17 @@ public class AlertActionManager extends AbstractComponent {
}
}
} else {
if (this.state.compareAndSet(State.STOPPED, State.STARTED)) {
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart();
listener.onSuccess();
if (this.state.compareAndSet(State.STOPPED, State.LOADING)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STARTED)) {
doStart();
listener.onSuccess();
}
}
});
}
}
}