Integration with Shield

- Change action names to be aligned with Shield. All actions are categorized as `cluster management`.. the read actions (get & stats) are also also categorized as `cluster monitoring`.

- Added `ShieldIntegration` and `WatcherShieldModule` to handle all the integration points.

- We have a new internal shield user `__watcher_user` that will be the actor behind all the watcher interal action executions (managing the `.watches` and `.watch_history` indices

- This integration revealed a bug where the watcher plugin would not wire correctly with transport clients. This is now fixed with the introduction of a dedicated `TransportClientWatcherModule`

- Added docs

Closes elastic/elasticsearch#43

Original commit: elastic/x-pack-elasticsearch@26e9b0da06
This commit is contained in:
uboness 2015-03-25 16:49:18 +01:00
parent 6277a32b91
commit fca9b6a1e6
32 changed files with 661 additions and 229 deletions

87
pom.xml
View File

@ -19,6 +19,8 @@
<elasticsearch.version>1.5.0</elasticsearch.version>
<lucene.maven.version>4.10.4</lucene.maven.version>
<lucene.version>4.10.4</lucene.version>
<shield.version>1.2.1-SNAPSHOT</shield.version>
<license.version>1.0.0</license.version>
<tests.jvms>auto</tests.jvms>
<tests.shuffle>true</tests.shuffle>
@ -45,13 +47,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-gmail</artifactId>
<version>v1-rev23-1.19.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
@ -109,20 +104,27 @@
<scope>test</scope>
</dependency>
<!-- Remove this when LocalDiscovery gets fixed in core -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<scope>test</scope>
</dependency>
<!-- Regular dependencies -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-license-plugin</artifactId>
<version>${license.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-shield</artifactId>
<version>${shield.version}</version>
<optional>true</optional>
</dependency>
<dependency>
@ -175,7 +177,61 @@
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-shield</artifactId>
<version>${shield.version}</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-license-plugin</artifactId>
<version>${license.version}</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>com.spatial4j</groupId>
<artifactId>spatial4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>http://maven.elasticsearch.org/releases</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>elasticsearch-snapshots</id>
<url>http://maven.elasticsearch.org/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<repository>
<id>maven2-repository.dev.java.net</id>
<name>Java.net Repository for Maven</name>
@ -183,6 +239,7 @@
<layout>default</layout>
</repository>
</repositories>
<build>
<resources>
<resource>

View File

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.watcher.transport.WatcherTransportModule;
public class TransportClientWatcherModule extends AbstractModule implements SpawnModules {
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new WatcherTransportModule());
}
@Override
protected void configure() {
}
}

View File

@ -6,6 +6,11 @@
package org.elasticsearch.watcher;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.actions.ActionModule;
import org.elasticsearch.watcher.client.WatcherClientModule;
import org.elasticsearch.watcher.condition.ConditionModule;
@ -13,21 +18,24 @@ import org.elasticsearch.watcher.history.HistoryModule;
import org.elasticsearch.watcher.input.InputModule;
import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.scheduler.SchedulerModule;
import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.clock.ClockModule;
import org.elasticsearch.watcher.support.init.InitializingModule;
import org.elasticsearch.watcher.support.template.TemplateModule;
import org.elasticsearch.watcher.transform.TransformModule;
import org.elasticsearch.watcher.transport.WatcherTransportModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.watcher.watch.WatchModule;
public class WatcherModule extends AbstractModule implements SpawnModules {
private final Settings settings;
public WatcherModule(Settings settings) {
this.settings = settings;
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(
@ -43,7 +51,8 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
new ConditionModule(),
new InputModule(),
new ActionModule(),
new HistoryModule());
new HistoryModule(),
new WatcherShieldModule(settings));
}
@Override

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.watcher;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.common.collect.ImmutableList;
@ -24,9 +26,11 @@ public class WatcherPlugin extends AbstractPlugin {
public static final String SCHEDULER_THREAD_POOL_NAME = "watcher_scheduler";
private final Settings settings;
private final boolean transportClient;
public WatcherPlugin(Settings settings) {
this.settings = settings;
transportClient = "transport".equals(settings.get(Client.CLIENT_TYPE_SETTING));
}
@Override public String name() {
@ -39,11 +43,16 @@ public class WatcherPlugin extends AbstractPlugin {
@Override
public Collection<Class<? extends Module>> modules() {
return ImmutableList.<Class<? extends Module>>of(WatcherModule.class);
return transportClient ?
ImmutableList.<Class<? extends Module>>of(TransportClientWatcherModule.class) :
ImmutableList.<Class<? extends Module>>of(WatcherModule.class);
}
@Override
public Collection<Class<? extends LifecycleComponent>> services() {
if (transportClient) {
return ImmutableList.of();
}
return ImmutableList.<Class<? extends LifecycleComponent>>of(
// the initialization service must be first in the list
// as other services may depend on one of the initialized
@ -54,6 +63,9 @@ public class WatcherPlugin extends AbstractPlugin {
@Override
public Settings additionalSettings() {
if (transportClient) {
return ImmutableSettings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return settingsBuilder()
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "fixed")

View File

@ -71,7 +71,7 @@ public class IndexAction extends Action<IndexAction.Result> {
}
try {
IndexResponse response = client.index(indexRequest).actionGet();
IndexResponse response = client.index(indexRequest);
Map<String,Object> data = new HashMap<>();
data.put("created", response.isCreated());
data.put("id", response.getId());

View File

@ -60,10 +60,10 @@ public class HistoryStore extends AbstractComponent {
public void put(WatchRecord watchRecord) throws HistoryException {
String index = getHistoryIndexNameForTime(watchRecord.scheduledTime());
try {
IndexResponse response = client.prepareIndex(index, DOC_TYPE, watchRecord.id())
.setSource(XContentFactory.jsonBuilder().value(watchRecord))
.setOpType(IndexRequest.OpType.CREATE)
.get();
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id())
.source(XContentFactory.jsonBuilder().value(watchRecord))
.opType(IndexRequest.OpType.CREATE);
IndexResponse response = client.index(request);
watchRecord.version(response.getVersion());
} catch (IOException e) {
throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e);
@ -74,10 +74,10 @@ public class HistoryStore extends AbstractComponent {
logger.debug("updating watch record [{}]...", watchRecord);
try {
BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes();
IndexResponse response = client.prepareIndex(getHistoryIndexNameForTime(watchRecord.scheduledTime()), DOC_TYPE, watchRecord.id())
.setSource(bytes)
.setVersion(watchRecord.version())
.get();
IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.scheduledTime()), DOC_TYPE, watchRecord.id())
.source(bytes, true)
.version(watchRecord.version());
IndexResponse response = client.index(request);
watchRecord.version(response.getVersion());
logger.debug("successfully updated watch record [{}]", watchRecord);
} catch (IOException e) {

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.shield;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.ShieldPlugin;
import org.elasticsearch.shield.authc.AuthenticationService;
import org.elasticsearch.transport.TransportMessage;
/**
*
*/
public class ShieldIntegration {
private final boolean installed;
private final boolean enabled;
private final Object authcService;
private final Object userHolder;
@Inject
public ShieldIntegration(Settings settings, Injector injector) {
installed = installed(settings);
enabled = installed && ShieldPlugin.shieldEnabled(settings);
authcService = enabled ? injector.getInstance(AuthenticationService.class) : null;
userHolder = enabled ? injector.getInstance(WatcherUserHolder.class) : null;
}
public boolean installed() {
return installed;
}
public boolean enabled() {
return enabled;
}
public void bindWatcherUser(TransportMessage message) {
if (authcService != null) {
((AuthenticationService) authcService).attachUserHeaderIfMissing(message, ((WatcherUserHolder) userHolder).user);
}
}
static boolean installed(Settings settings) {
try {
Class clazz = settings.getClassLoader().loadClass("org.elasticsearch.shield.ShieldPlugin");
return clazz != null;
} catch (ClassNotFoundException e) {
return false;
}
}
static boolean enabled(Settings settings) {
return installed(settings) && ShieldPlugin.shieldEnabled(settings);
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.shield;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.authz.AuthorizationModule;
import org.elasticsearch.shield.authz.Privilege;
/**
*
*/
public class WatcherShieldModule extends AbstractModule implements PreProcessModule {
private final ESLogger logger;
private final boolean enabled;
private final WatcherUserHolder userHolder;
public WatcherShieldModule(Settings settings) {
this.logger = Loggers.getLogger(WatcherShieldModule.class, settings);
this.enabled = ShieldIntegration.enabled(settings);
if (enabled) {
userHolder = new WatcherUserHolder();
registerClusterPrivilege("manage_watcher", "cluster:admin/watcher/*", "cluster:monitor/watcher/*");
registerClusterPrivilege("monitor_watcher", "cluster:monitor/watcher/*");
} else {
userHolder = null;
}
}
void registerClusterPrivilege(String name, String... patterns) {
try {
Privilege.Cluster.addCustom(name, patterns);
} catch (Exception se) {
logger.warn("could not register cluster privilege [{}]", name);
// we need to prevent bubbling the shield exception here for the tests. In the tests
// we create multiple nodes in the same jvm and since the custom cluster is a static binding
// multiple nodes will try to add the same privileges multiple times.
}
}
@Override
public void processModule(Module module) {
if (enabled && module instanceof AuthorizationModule) {
((AuthorizationModule) module).registerReservedRole(userHolder.role);
}
}
@Override
protected void configure() {
bind(ShieldIntegration.class).asEagerSingleton();
bind(WatcherUserHolder.class).toProvider(Providers.of(userHolder));
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.shield;
import org.elasticsearch.shield.User;
import org.elasticsearch.shield.authz.Permission;
import org.elasticsearch.shield.authz.Privilege;
/**
*
*/
public class WatcherUserHolder {
static final String NAME = "__watcher_user";
static final String[] ROLE_NAMES = new String[] { "__watcher_role" };
final Permission.Global.Role role = Permission.Global.Role.builder(ROLE_NAMES[0])
.set(Privilege.Cluster.action("indices:admin/template/put"))
// for now, the watches will be executed under the watcher user, meaning, all actions
// taken as part of the execution will be executed on behalf of this user. this includes
// the index action, search input and search transform. For this reason the watcher user
// requires full access to all indices in the cluster.
//
// at later phases we'll want to execute the watch on behalf of the user who registers
// it. this will require some work to attache/persist that user to/with the watch.
.add(Privilege.Index.ALL, "*")
.build();
final User user = new User.Simple(NAME, ROLE_NAMES);
}

View File

@ -9,14 +9,15 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.watcher.watch.WatchStore;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.shield.ShieldIntegration;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -31,12 +32,14 @@ public class TemplateUtils extends AbstractComponent {
private final static Pattern TEMPLATE_VERSION_PATTERN = Pattern.compile("watcher.template_version\"\\s*:\\s*\"?(\\d+)\"?");
private final TransportPutIndexTemplateAction transportPutIndexTemplateAction;
private final ShieldIntegration shieldIntegration;
private final TransportPutIndexTemplateAction action;
@Inject
public TemplateUtils(Settings settings, TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
public TemplateUtils(Settings settings, TransportPutIndexTemplateAction action, ShieldIntegration shieldIntegration) {
super(settings);
this.transportPutIndexTemplateAction = transportPutIndexTemplateAction;
this.action = action;
this.shieldIntegration = shieldIntegration;
}
/**
@ -83,8 +86,10 @@ public class TemplateUtils extends AbstractComponent {
}
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
shieldIntegration.bindWatcherUser(request);
// We're already running on the master and TransportPutIndexTemplateAction#executor() is SAME, so it is ok to wait:
ActionFuture<PutIndexTemplateResponse> future = transportPutIndexTemplateAction.execute(request);
ActionFuture<PutIndexTemplateResponse> future = action.execute(request);
PutIndexTemplateResponse response = future.actionGet();
} catch (IOException e) {
// if we're not sure of the template, we can't send data... re-raise exception.

View File

@ -6,21 +6,21 @@
package org.elasticsearch.watcher.support.init.proxy;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.watcher.shield.ShieldIntegration;
import org.elasticsearch.watcher.support.init.InitializingService;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
@ -28,13 +28,19 @@ import org.elasticsearch.common.unit.TimeValue;
*/
public class ClientProxy implements InitializingService.Initializable {
private final ShieldIntegration shieldIntegration;
private Client client;
@Inject
public ClientProxy(ShieldIntegration shieldIntegration) {
this.shieldIntegration = shieldIntegration;
}
/**
* Creates a proxy to the given client (can be used for testing)
*/
public static ClientProxy of(Client client) {
ClientProxy proxy = new ClientProxy();
ClientProxy proxy = new ClientProxy(null);
proxy.client = client;
return proxy;
}
@ -48,43 +54,38 @@ public class ClientProxy implements InitializingService.Initializable {
return client.admin();
}
public ActionFuture<IndexResponse> index(IndexRequest request) {
return client.index(request);
}
public void index(IndexRequest request, ActionListener<IndexResponse> listener) {
client.index(request, listener);
}
public IndexRequestBuilder prepareIndex(String index, String type, String id) {
return client.prepareIndex(index, type, id);
public IndexResponse index(IndexRequest request) {
return client.index(preProcess(request)).actionGet();
}
public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
return client.delete(request);
}
public GetRequestBuilder prepareGet(String index, String type, String id) {
return client.prepareGet(index, type, id);
return client.delete(preProcess(request));
}
public SearchResponse search(SearchRequest request) {
return client.search(request).actionGet();
return client.search(preProcess(request)).actionGet();
}
public SearchResponse searchScroll(String scrollId, TimeValue timeout) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout);
return client.searchScroll(request).actionGet();
return client.searchScroll(preProcess(request)).actionGet();
}
public ClearScrollResponse clearScroll(String scrollId) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
return client.clearScroll(request).actionGet();
return client.clearScroll(preProcess(request)).actionGet();
}
public RefreshResponse refresh(RefreshRequest request) {
return client.admin().indices().refresh(request).actionGet();
return client.admin().indices().refresh(preProcess(request)).actionGet();
}
<M extends TransportMessage> M preProcess(M message) {
if (shieldIntegration != null) {
shieldIntegration.bindWatcherUser(message);
}
return message;
}
}

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class AckWatchAction extends WatcherAction<AckWatchRequest, AckWatchResponse, AckWatchRequestBuilder> {
public static final AckWatchAction INSTANCE = new AckWatchAction();
public static final String NAME = "indices:data/write/watch/ack";
public static final String NAME = "cluster:admin/watcher/watch/ack";
private AckWatchAction() {
super(NAME);

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class DeleteWatchAction extends WatcherAction<DeleteWatchRequest, DeleteWatchResponse, DeleteWatchRequestBuilder> {
public static final DeleteWatchAction INSTANCE = new DeleteWatchAction();
public static final String NAME = "indices:data/write/watch/delete";
public static final String NAME = "cluster:admin/watcher/watch/delete";
private DeleteWatchAction() {
super(NAME);

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class GetWatchAction extends WatcherAction<GetWatchRequest, GetWatchResponse, GetWatchRequestBuilder> {
public static final GetWatchAction INSTANCE = new GetWatchAction();
public static final String NAME = "indices:data/read/watch/get";
public static final String NAME = "cluster:monitor/watcher/watch/get";
private GetWatchAction() {
super(NAME);

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class PutWatchAction extends WatcherAction<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> {
public static final PutWatchAction INSTANCE = new PutWatchAction();
public static final String NAME = "indices:data/write/watch/put";
public static final String NAME = "cluster:admin/watcher/watch/put";
private PutWatchAction() {
super(NAME);

View File

@ -27,8 +27,8 @@ public class TransportWatcherServiceAction extends TransportMasterNodeOperationA
private final WatcherLifeCycleService lifeCycleService;
@Inject
public TransportWatcherServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, WatcherLifeCycleService lifeCycleService) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
public TransportWatcherServiceAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, WatcherLifeCycleService lifeCycleService) {
super(settings, WatcherServiceAction.NAME, transportService, clusterService, threadPool, actionFilters);
this.lifeCycleService = lifeCycleService;
}

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class WatcherStatsAction extends WatcherAction<WatcherStatsRequest, WatcherStatsResponse, WatcherStatsRequestBuilder> {
public static final WatcherStatsAction INSTANCE = new WatcherStatsAction();
public static final String NAME = "cluster/watcher/stats";
public static final String NAME = "cluster:monitor/watcher/stats";
private WatcherStatsAction() {
super(NAME);

View File

@ -137,7 +137,7 @@ public class WatchStore extends AbstractComponent {
ensureStarted();
Watch watch = watchParser.parse(name, false, source);
IndexRequest indexRequest = createIndexRequest(name, source);
IndexResponse response = client.index(indexRequest).actionGet();
IndexResponse response = client.index(indexRequest);
watch.status().version(response.getVersion());
Watch previous = watches.put(name, watch);
return new WatchPut(previous, watch, response);
@ -160,7 +160,7 @@ public class WatchStore extends AbstractComponent {
ensureStarted();
assert watch == watches.get(watch.name()) : "update watch can only be applied to an already loaded watch";
BytesReference source = JsonXContent.contentBuilder().value(watch).bytes();
IndexResponse response = client.index(createIndexRequest(watch.name(), source)).actionGet();
IndexResponse response = client.index(createIndexRequest(watch.name(), source));
watch.status().version(response.getVersion());
// Don't need to update the watches, since we are working on an instance from it.
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher;
import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@ -13,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -37,7 +37,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
@Before
public void prepareServices() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.sameThreadExecutor());
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watchService = mock(WatchService.class);
clusterService = mock(ClusterService.class);
indicesService = mock(IndicesService.class);

View File

@ -7,16 +7,11 @@ package org.elasticsearch.watcher.condition.script;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
@ -29,6 +24,11 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -42,7 +42,7 @@ import static org.mockito.Mockito.when;
/**
*/
public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests {
public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTests {
private ThreadPool tp = null;
private ScriptServiceProxy scriptService;
@ -67,8 +67,10 @@ public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests {
@Test
public void testExecute_withAggs() throws Exception {
createIndex("my-index", client().admin().indices().prepareCreate("my-index")
.addMapping("my-type", "_timestamp", "enabled=true"));
client().admin().indices().prepareCreate("my-index")
.addMapping("my-type", "_timestamp", "enabled=true")
.get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:00").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:10").setSource("{}").get();

View File

@ -12,7 +12,7 @@ import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.simple.AlwaysTrueCondition;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test;
@ -22,12 +22,12 @@ import static org.hamcrest.Matchers.*;
/**
*/
public class HistoryStoreLifeCycleTest extends AbstractWatcherSingleNodeTests {
public class HistoryStoreLifeCycleTest extends AbstractWatcherIntegrationTests {
@Test
public void testPutLoadUpdate() throws Exception {
Condition condition = new AlwaysTrueCondition(logger);
HistoryStore historyStore = getInstanceFromNode(HistoryStore.class);
HistoryStore historyStore = getInstanceFromMaster(HistoryStore.class);
Watch watch = new Watch("_name", SystemClock.INSTANCE, null, null, condition, null, null, null, null, null);
// Put watch records and verify that these are stored
@ -43,7 +43,7 @@ public class HistoryStoreLifeCycleTest extends AbstractWatcherSingleNodeTests {
}
// Load the stored watch records
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
ClusterService clusterService = getInstanceFromMaster(ClusterService.class);
Collection<WatchRecord> records = historyStore.loadRecords(clusterService.state(), WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, notNullValue());
assertThat(records, hasSize(watchRecords.length));

View File

@ -8,7 +8,6 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -26,7 +25,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit;
@ -43,6 +41,7 @@ import org.junit.Test;
import java.util.Collection;
import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsEqual.equalTo;
@ -76,21 +75,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.metadata()).thenReturn(null);
WatchRecord watchRecord = new WatchRecord(watch, new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
when(builder.setSource(any(XContentBuilder.class))).thenReturn(builder);
when(builder.setOpType(IndexRequest.OpType.CREATE)).thenReturn(builder);
IndexResponse indexResponse = mock(IndexResponse.class);
long version = randomLong();
when(indexResponse.getVersion()).thenReturn(version);
when(builder.get()).thenReturn(indexResponse);
when(clientProxy.prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z")).thenReturn(builder);
when(clientProxy.index(indexRequest(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z", IndexRequest.OpType.CREATE))).thenReturn(indexResponse);
historyStore.put(watchRecord);
assertThat(watchRecord.version(), equalTo(version));
verify(builder, times(1)).setSource(any(XContentBuilder.class));
verify(builder, times(1)).setOpType(IndexRequest.OpType.CREATE);
verify(clientProxy, times(1)).prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z");
}
@Test
@ -103,21 +94,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
WatchRecord watchRecord = new WatchRecord(watch, new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
watchRecord.version(4l);
IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
when(builder.setSource(any(BytesReference.class))).thenReturn(builder);
when(builder.setVersion(4l)).thenReturn(builder);
IndexResponse indexResponse = mock(IndexResponse.class);
long version = randomLong();
when(indexResponse.getVersion()).thenReturn(version);
when(builder.get()).thenReturn(indexResponse);
when(clientProxy.prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z")).thenReturn(builder);
when(clientProxy.index(indexRequest(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z", 4L, null))).thenReturn(indexResponse);
historyStore.update(watchRecord);
assertThat(watchRecord.version(), equalTo(version));
verify(builder, times(1)).setSource(any(BytesReference.class));
verify(builder, times(1)).setVersion(4l);
verify(clientProxy, times(1)).prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z");
}
@Test

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.watcher.scheduler.schedule;
import com.google.common.primitives.Ints;
import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.watcher.scheduler.schedule.IntervalSchedule.Interval.Unit;
import org.elasticsearch.watcher.scheduler.schedule.support.*;
import org.elasticsearch.common.xcontent.ToXContent;

View File

@ -5,17 +5,22 @@
*/
package org.elasticsearch.watcher.test;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.netty.util.internal.SystemPropertyUtil;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -23,7 +28,10 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.shield.ShieldPlugin;
import org.elasticsearch.shield.authc.esusers.ESUsersRealm;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
@ -51,8 +59,10 @@ import org.elasticsearch.watcher.watch.WatchService;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -72,13 +82,21 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
private TimeWarp timeWarp;
boolean shieldEnabled = shieldEnabled();
private TransportClient shieldWatcherTransportClient;
private WatcherClient shieldWatcherClient;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
ImmutableSettings.Builder builder = ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types", timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName())
.build();
.put("plugin.types",
(timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," +
(shieldEnabled ? ShieldPlugin.class.getName() + "," : "") +
LicensePlugin.class.getName())
.put(ShieldSettings.settings(shieldEnabled));
return builder.build();
}
/**
@ -91,8 +109,46 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return timeWarpEnabled;
}
/**
* Override and returns {@code false} to force running without shield
*/
protected boolean shieldEnabled() {
return randomBoolean();
}
@Before
public void setupTimeWarp() throws Exception {
public void _setup() throws Exception {
setupTimeWarp();
startWatcherIfNodesExist();
}
@After
public void _cleanup() throws Exception {
// Clear all internal watcher state for the next test method:
logger.info("[{}#{}]: clearing watches", getTestClass().getSimpleName(), getTestName());
stopWatcher();
if (shieldWatcherTransportClient != null) {
shieldWatcherTransportClient.close();
}
}
@Override
protected Settings transportClientSettings() {
if (!shieldEnabled) {
return ImmutableSettings.builder()
.put(super.transportClientSettings())
.put("plugin.types", WatcherPlugin.class.getName())
.build();
}
return ImmutableSettings.builder()
.put("client.transport.sniff", false)
.put("plugin.types", ShieldPlugin.class.getName() + "," + WatcherPlugin.class.getName())
.put("shield.user", "admin:changeme")
.build();
}
private void setupTimeWarp() throws Exception {
if (timeWarped()) {
timeWarp = new TimeWarp(
internalTestCluster().getInstance(SchedulerMock.class, internalTestCluster().getMasterName()),
@ -100,6 +156,20 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
}
private void startWatcherIfNodesExist() throws Exception {
if (internalTestCluster().size() > 0) {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
if (response.getWatchServiceState() == WatchService.State.STOPPED) {
logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
startWatcher();
} else {
logger.info("[{}#{}]: not starting watcher, because watcher is in state [{}]", getTestClass().getSimpleName(), getTestName(), response.getWatchServiceState());
}
} else {
logger.info("[{}#{}]: not starting watcher, because test cluster has no nodes", getTestClass().getSimpleName(), getTestName());
}
}
protected TimeWarp timeWarp() {
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
return timeWarp;
@ -117,28 +187,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return new WatcherWrappingCluster(seed, testCluster);
}
@Before
public void startWatcherIfNodesExist() throws Exception {
if (internalTestCluster().size() > 0) {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
if (response.getWatchServiceState() == WatchService.State.STOPPED) {
logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
startWatcher();
} else {
logger.info("[{}#{}]: not starting watcher, because watcher is in state [{}]", getTestClass().getSimpleName(), getTestName(), response.getWatchServiceState());
}
} else {
logger.info("[{}#{}]: not starting watcher, because test cluster has no nodes", getTestClass().getSimpleName(), getTestName());
}
}
@After
public void clearWatches() throws Exception {
// Clear all internal watcher state for the next test method:
logger.info("[{}#{}]: clearing watches", getTestClass().getSimpleName(), getTestName());
stopWatcher();
}
protected long docCount(String index, String type, QueryBuilder query) {
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
}
@ -210,16 +258,30 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return builder.bytes();
}
protected <T> T getInstanceFromMaster(Class<T> type) {
return internalTestCluster().getInstance(type, internalTestCluster().getMasterName());
}
protected Watch.Parser watchParser() {
return internalTestCluster().getInstance(Watch.Parser.class, internalTestCluster().getMasterName());
return getInstanceFromMaster(Watch.Parser.class);
}
protected Scheduler scheduler() {
return internalTestCluster().getInstance(Scheduler.class, internalTestCluster().getMasterName());
return getInstanceFromMaster(Scheduler.class);
}
protected WatcherClient watcherClient() {
return internalTestCluster().getInstance(WatcherClient.class);
return shieldEnabled ?
new WatcherClient(internalTestCluster().transportClient()) :
new WatcherClient(client());
// if (shieldEnabled) {
// if (shieldWatcherClient == null) {
// shieldWatcherClient = createShieldWatcherClient();
// }
// return shieldWatcherClient;
// }
// WatcherClient client = internalTestCluster().getInstance(WatcherClient.class);
// return client;
}
protected ScriptServiceProxy scriptService() {
@ -476,4 +538,83 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
}
/** Shield related settings */
static class ShieldSettings {
public static final String IP_FILTER = "allow: all\n";
public static final String USERS =
"test:{plain}changeme\n" +
"admin:{plain}changeme\n" +
"monitor:{plain}changeme";
public static final String USER_ROLES =
"test:test\n" +
"admin:admin\n" +
"monitor:monitor";
public static final String ROLES =
"test:\n" + // a user for the test infra.
" cluster: all\n" +
" indices:\n" +
" '*': all\n" +
"\n" +
"admin:\n" +
" cluster: manage_watcher, cluster:monitor/nodes/info\n" +
"\n" +
"monitor:\n" +
" cluster: monitor_watcher, cluster:monitor/nodes/info\n"
;
static Settings settings(boolean enabled) {
ImmutableSettings.Builder builder = ImmutableSettings.builder();
if (!enabled) {
return builder.put("shield.enabled", false).build();
}
File folder = createFolder(globalTempDir(), "watcher_shield");
return builder.put("shield.enabled", true)
.put("shield.user", "test:changeme")
.put("shield.authc.realms.esusers.type", ESUsersRealm.TYPE)
.put("shield.authc.realms.esusers.order", 0)
.put("shield.authc.realms.esusers.files.users", writeFile(folder, "users", USERS))
.put("shield.authc.realms.esusers.files.users_roles", writeFile(folder, "users_roles", USER_ROLES))
.put("shield.authz.store.files.roles", writeFile(folder, "roles.yml", ROLES))
.put("shield.transport.n2n.ip_filter.file", writeFile(folder, "ip_filter.yml", IP_FILTER))
.put("shield.audit.enabled", true)
.build();
}
static File createFolder(File parent, String name) {
File createdFolder = new File(parent, name);
//the directory might exist e.g. if the global cluster gets restarted, then we recreate the directory as well
if (createdFolder.exists()) {
if (!FileSystemUtils.deleteRecursively(createdFolder)) {
throw new RuntimeException("could not delete existing temporary folder: " + createdFolder.getAbsolutePath());
}
}
if (!createdFolder.mkdir()) {
throw new RuntimeException("could not create temporary folder: " + createdFolder.getAbsolutePath());
}
return createdFolder;
}
static String writeFile(File folder, String name, String content) {
return writeFile(folder, name, content.getBytes(Charsets.UTF_8));
}
static String writeFile(File folder, String name, byte[] content) {
Path file = folder.toPath().resolve(name);
try {
Streams.copy(content, file.toFile());
} catch (IOException e) {
throw new ElasticsearchException("error writing file in test", e);
}
return file.toFile().getAbsolutePath();
}
}
}

View File

@ -1,63 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.elasticsearch.watcher.WatcherLifeCycleService;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
/**
*
*/
public abstract class AbstractWatcherSingleNodeTests extends ElasticsearchSingleNodeTest {
@BeforeClass
public static void initSuite() throws Exception {
getInstanceFromNode(WatcherLifeCycleService.class).start();
}
@AfterClass
public static void cleanupSuite() throws Exception {
getInstanceFromNode(WatcherLifeCycleService.class).stop();
}
@Override
protected boolean resetNodeAfterTest() {
return false;
}
protected IndexResponse index(String index, String type, String id) {
return index(index, type, id, Collections.<String, Object>emptyMap());
}
protected IndexResponse index(String index, String type, String id, Map<String, Object> doc) {
return client().prepareIndex(index, type, id).setSource(doc).get();
}
protected RefreshResponse refresh() {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet();
assertNoFailures(actionGet);
return actionGet;
}
protected ClientProxy clientProxy() {
return ClientProxy.of(client());
}
protected ScriptServiceProxy scriptService() {
return getInstanceFromNode(ScriptServiceProxy.class);
}
}

View File

@ -45,6 +45,10 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin {
*/
public static class WatcherModule extends org.elasticsearch.watcher.WatcherModule {
public WatcherModule(Settings settings) {
super(settings);
}
@Override
public Iterable<? extends Module> spawnModules() {
List<Module> modules = new ArrayList<>();

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test;
import org.elasticsearch.action.index.IndexRequest;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.mockito.Matchers;
/**
*
*/
public final class WatcherMatchers {
private WatcherMatchers() {
}
public static IndexRequest indexRequest(String index, String type, String id) {
return Matchers.argThat(indexRequestMatcher(index, type, id));
}
public static IndexRequest indexRequest(String index, String type, String id, IndexRequest.OpType opType) {
return Matchers.argThat(indexRequestMatcher(index, type, id).opType(opType));
}
public static IndexRequest indexRequest(String index, String type, String id, Long version, IndexRequest.OpType opType) {
return Matchers.argThat(indexRequestMatcher(index, type, id).version(version).opType(opType));
}
public static IndexRequestMatcher indexRequestMatcher(String index, String type, String id) {
return new IndexRequestMatcher(index, type, id);
}
public static class IndexRequestMatcher extends TypeSafeMatcher<IndexRequest> {
private final String index;
private final String type;
private final String id;
private Long version;
private IndexRequest.OpType opType;
private IndexRequestMatcher(String index, String type, String id) {
this.index = index;
this.type = type;
this.id = id;
}
public IndexRequestMatcher version(long version) {
this.version = version;
return this;
}
public IndexRequestMatcher opType(IndexRequest.OpType opType) {
this.opType = opType;
return this;
}
@Override
protected boolean matchesSafely(IndexRequest request) {
if (!index.equals(request.index()) || !type.equals(request.type()) || !id.equals(request.id())) {
return false;
}
if (version != null && !version.equals(request.version())) {
return false;
}
if (opType != null && !opType.equals(request.opType())) {
return false;
}
return true;
}
@Override
public void describeTo(Description description) {
description.appendText("is index request [" + index + "/" + type + "/" + id + "]");
}
}
}

View File

@ -5,11 +5,18 @@
*/
package org.elasticsearch.watcher.test;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Actions;
import org.elasticsearch.watcher.actions.email.EmailAction;
@ -22,23 +29,19 @@ import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.scheduler.schedule.CronSchedule;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transform.SearchTransform;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import javax.mail.internet.AddressException;
import java.util.ArrayList;
@ -149,4 +152,5 @@ public final class WatcherTestUtils {
new TimeValue(0),
new Watch.Status());
}
}

View File

@ -102,6 +102,10 @@ public class WatcherBenchmark {
public static class WatcherModule extends org.elasticsearch.watcher.WatcherModule {
public WatcherModule(Settings settings) {
super(settings);
}
@Override
public Iterable<? extends Module> spawnModules() {
List<Module> modules = new ArrayList<>();

View File

@ -6,8 +6,11 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.watch.WatchStore;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.scheduler.schedule.IntervalSchedule;
@ -16,12 +19,15 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
@ -30,12 +36,6 @@ import static org.elasticsearch.watcher.scheduler.schedule.Schedules.cron;
import static org.elasticsearch.watcher.scheduler.schedule.Schedules.interval;
import static org.elasticsearch.watcher.support.Variables.*;
import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
/**

View File

@ -50,6 +50,11 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
return false;
}
@Override
protected boolean shieldEnabled() {
return false;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings settings = super.nodeSettings(nodeOrdinal);

View File

@ -8,11 +8,6 @@ package org.elasticsearch.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -20,24 +15,29 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class SearchTransformTests extends AbstractWatcherSingleNodeTests {
public class SearchTransformTests extends AbstractWatcherIntegrationTests {
@Test
public void testApply() throws Exception {