Fix Watcher binary compatibility between ES 1.5 and 1.6

Original commit: elastic/x-pack-elasticsearch@a367d016aa
This commit is contained in:
Martijn van Groningen 2015-05-11 15:32:31 +02:00
parent 6320f39023
commit 66c032f6df
10 changed files with 61 additions and 39 deletions

View File

@ -284,10 +284,15 @@ public class HistoryStore extends AbstractComponent implements NodeSettingsServi
} }
putUpdateLock.lock(); putUpdateLock.lock();
try { try {
BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes(); BytesReference source = XContentFactory.jsonBuilder().value(watchRecord).bytes();
IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value()) IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value())
.source(bytes, true)
.version(watchRecord.version()); .version(watchRecord.version());
// TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly
if (source.hasArray()) {
request.source(source.array(), source.arrayOffset(), source.length());
} else {
request.source(source.toBytes());
}
IndexResponse response = client.index(request); IndexResponse response = client.index(request);
watchRecord.version(response.getVersion()); watchRecord.version(response.getVersion());
logger.debug("successfully updated watch record [{}]", watchRecord); logger.debug("successfully updated watch record [{}]", watchRecord);

View File

@ -75,7 +75,11 @@ public final class WatcherUtils {
XContentHelper.writeRawField("template", requestPrototype.source(), builder, ToXContent.EMPTY_PARAMS); XContentHelper.writeRawField("template", requestPrototype.source(), builder, ToXContent.EMPTY_PARAMS);
builder.field("params", watcherContextParams); builder.field("params", watcherContextParams);
builder.endObject(); builder.endObject();
request.templateSource(builder.bytes(), false); // Unfortunately because of SearchRequest#templateSource(BytesReference, boolean) has been removed in 1.6 and
// SearchRequest#templateSource(BytesReference) doesn't exist in 1.5, we are forced to use SearchRequest#templateSource(String)
// that exist in both 1.5 and 1.6
// TODO (2.0 upgrade): move back to BytesReference
request.templateSource(builder.string());
} else if (Strings.hasLength(requestPrototype.templateSource())) { } else if (Strings.hasLength(requestPrototype.templateSource())) {
// Here we convert watcher template into a ES core templates. Due to the different format we use, we // Here we convert watcher template into a ES core templates. Due to the different format we use, we
// convert to the template format used in ES core // convert to the template format used in ES core
@ -107,8 +111,11 @@ public final class WatcherUtils {
params.putAll(template.getParams()); params.putAll(template.getParams());
builder.field("params", params); builder.field("params", params);
builder.endObject(); builder.endObject();
// Unfortunately because of SearchRequest#templateSource(BytesReference, boolean) has been removed in 1.6 and
request.templateSource(builder.bytes(), false); // SearchRequest#templateSource(BytesReference) doesn't exist in 1.5, we are forced to use SearchRequest#templateSource(String)
// that exist in both 1.5 and 1.6
// TODO (2.0 upgrade): move back to BytesReference
request.templateSource(builder.string());
} }
} else if (requestPrototype.templateName() != null) { } else if (requestPrototype.templateName() != null) {
// In Watcher templates on all places can be defined in one format // In Watcher templates on all places can be defined in one format
@ -125,7 +132,7 @@ public final class WatcherUtils {
*/ */
public static SearchRequest readSearchRequest(XContentParser parser, SearchType searchType) throws IOException { public static SearchRequest readSearchRequest(XContentParser parser, SearchType searchType) throws IOException {
BytesReference searchBody = null; BytesReference searchBody = null;
BytesReference templateBody = null; String templateBody = null;
IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
@ -208,7 +215,7 @@ public final class WatcherUtils {
} else if (TEMPLATE_FIELD.match(currentFieldName)) { } else if (TEMPLATE_FIELD.match(currentFieldName)) {
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent()); XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
builder.copyCurrentStructure(parser); builder.copyCurrentStructure(parser);
templateBody = builder.bytes(); templateBody = builder.string();
} else { } else {
throw new SearchRequestParseException("could not read search request. unexpected object field [" + currentFieldName + "]"); throw new SearchRequestParseException("could not read search request. unexpected object field [" + currentFieldName + "]");
} }
@ -238,10 +245,16 @@ public final class WatcherUtils {
searchRequest.searchType(searchType); searchRequest.searchType(searchType);
searchRequest.indicesOptions(indicesOptions); searchRequest.indicesOptions(indicesOptions);
if (searchBody != null) { if (searchBody != null) {
searchRequest.source(searchBody, false); // TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly
assert searchBody.hasArray();
searchRequest.source(searchBody.array(), searchBody.arrayOffset(), searchBody.length());
} }
if (templateBody != null) { if (templateBody != null) {
searchRequest.templateSource(templateBody, false); // Unfortunately because of SearchRequest#templateSource(BytesReference, boolean) has been removed in 1.6 and
// SearchRequest#templateSource(BytesReference) doesn't exist in 1.5, we are forced to use SearchRequest#templateSource(String)
// that exist in both 1.5 and 1.6
// TODO (2.0 upgrade): move back to BytesReference
searchRequest.templateSource(templateBody);
} }
return searchRequest; return searchRequest;
} }

View File

@ -183,7 +183,12 @@ public class WatchStore extends AbstractComponent {
IndexRequest createIndexRequest(String id, BytesReference source, long version) { IndexRequest createIndexRequest(String id, BytesReference source, long version) {
IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id); IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id);
indexRequest.listenerThreaded(false); indexRequest.listenerThreaded(false);
indexRequest.source(source, false); // TODO (2.0 upgrade): move back to BytesReference instead of dealing with the array directly
if (source.hasArray()) {
indexRequest.source(source.array(), source.arrayOffset(), source.length());
} else {
indexRequest.source(source.toBytes());
}
indexRequest.version(version); indexRequest.version(version);
return indexRequest; return indexRequest;
} }

View File

@ -77,7 +77,7 @@ public class WebhookActionTests extends ElasticsearchTestCase {
@Before @Before
public void init() throws IOException { public void init() throws Exception {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new ThreadPool(ThreadPool.Names.SAME);
Settings settings = ImmutableSettings.EMPTY; Settings settings = ImmutableSettings.EMPTY;
scriptService = WatcherTestUtils.getScriptServiceProxy(tp); scriptService = WatcherTestUtils.getScriptServiceProxy(tp);

View File

@ -7,14 +7,7 @@ package org.elasticsearch.watcher.condition.script;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
@ -23,19 +16,16 @@ import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.watcher.test.WatcherTestUtils.mockExecutionContext; import static org.elasticsearch.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -50,14 +40,7 @@ public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTests
@Before @Before
public void init() throws Exception { public void init() throws Exception {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new ThreadPool(ThreadPool.Names.SAME);
Settings settings = ImmutableSettings.settingsBuilder() scriptService = WatcherTestUtils.getScriptServiceProxy(tp);
.put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none")
.build();
GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings);
Set<ScriptEngineService> engineServiceSet = new HashSet<>();
engineServiceSet.add(groovyScriptEngineService);
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);
scriptService = ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService));
} }
@After @After

View File

@ -119,7 +119,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
Template.file(text).params(params).build(), Template.file(text).params(params).build(),
Template.indexed(text).params(params).build() Template.indexed(text).params(params).build()
); );
expectedRequest.templateSource(jsonBuilder().startObject().field("template", template).endObject().bytes(), false); expectedRequest.templateSource(jsonBuilder().startObject().field("template", template).endObject().string());
} }
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();

View File

@ -68,6 +68,7 @@ import org.elasticsearch.watcher.watch.Watch;
import javax.mail.internet.AddressException; import javax.mail.internet.AddressException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.*; import java.util.*;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -98,8 +99,9 @@ public final class WatcherTestUtils {
} }
public static SearchRequest matchAllRequest(IndicesOptions indicesOptions) { public static SearchRequest matchAllRequest(IndicesOptions indicesOptions) {
// TODO (2.0 upgrade): move back to BytesReference, instead of converting to a string
SearchRequest request = new SearchRequest(Strings.EMPTY_ARRAY) SearchRequest request = new SearchRequest(Strings.EMPTY_ARRAY)
.source(SearchSourceBuilder.searchSource().query(matchAllQuery()).buildAsBytes(XContentType.JSON), false); .source(SearchSourceBuilder.searchSource().query(matchAllQuery()).buildAsBytes(XContentType.JSON).toUtf8());
if (indicesOptions != null) { if (indicesOptions != null) {
request.indicesOptions(indicesOptions); request.indicesOptions(indicesOptions);
} }
@ -202,7 +204,7 @@ public final class WatcherTestUtils {
new Watch.Status()); new Watch.Status());
} }
public static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) throws IOException { public static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) throws Exception {
Settings settings = ImmutableSettings.settingsBuilder() Settings settings = ImmutableSettings.settingsBuilder()
.put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none") .put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none")
.build(); .build();
@ -212,7 +214,21 @@ public final class WatcherTestUtils {
engineServiceSet.add(mustacheScriptEngineService); engineServiceSet.add(mustacheScriptEngineService);
engineServiceSet.add(groovyScriptEngineService); engineServiceSet.add(groovyScriptEngineService);
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings); NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);
return ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService));
// TODO (2.0 upgrade): remove this reflection hack:
Class scriptServiceClass = ScriptService.class;
Constructor scriptServiceConstructor = scriptServiceClass.getConstructors()[0];
if (scriptServiceConstructor.getParameterTypes().length == 5) {
return ScriptServiceProxy.of((ScriptService) scriptServiceConstructor.newInstance(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService));
} else if (scriptServiceConstructor.getParameterTypes().length == 6) {
Class scriptContextRegistryClass = Class.forName("org.elasticsearch.script.ScriptContextRegistry");
Constructor scriptContextRegistryConstructor = scriptContextRegistryClass.getDeclaredConstructors()[0];
scriptContextRegistryConstructor.setAccessible(true);
Object scriptContextRegistry = scriptContextRegistryConstructor.newInstance(Collections.emptyList());
return ScriptServiceProxy.of((ScriptService) scriptServiceConstructor.newInstance(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService, scriptContextRegistry));
} else {
throw new RuntimeException("ScriptService is supposed to have 5 or 6 parameters in its constructor");
}
} }
public static SearchType getRandomSupportedSearchType() { public static SearchType getRandomSupportedSearchType() {

View File

@ -318,7 +318,8 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
.value(Template.indexed("my-template").build()) .value(Template.indexed("my-template").build())
.bytes(); .bytes();
SearchRequest searchRequest = newInputSearchRequest("events"); SearchRequest searchRequest = newInputSearchRequest("events");
searchRequest.templateSource(templateSource, false); // TODO (2.0 upgrade): move back to BytesReference instead of coverting to a string
searchRequest.templateSource(templateSource.toUtf8());
testConditionSearch(searchRequest); testConditionSearch(searchRequest);
} }

View File

@ -65,7 +65,6 @@ public class HistoryTemplateEmailMappingsTests extends AbstractWatcherIntegratio
} }
return ImmutableSettings.builder() return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put("script.disable_dynamic", false)
// email // email
.put("watcher.actions.email.service.account.test.smtp.auth", true) .put("watcher.actions.email.service.account.test.smtp.auth", true)

View File

@ -340,8 +340,8 @@ public class WatchTests extends ElasticsearchTestCase {
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine))); list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine)));
} }
if (randomBoolean()) { if (randomBoolean()) {
IndexAction aciton = new IndexAction("_index", "_type"); IndexAction action = new IndexAction("_index", "_type");
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomTransform(), new ExecutableIndexAction(aciton, logger, client))); list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomTransform(), new ExecutableIndexAction(action, logger, client)));
} }
if (randomBoolean()) { if (randomBoolean()) {
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000)) HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))