diff --git a/pom.xml b/pom.xml
index b0b75b58932..2dea9dbbedd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,6 +19,8 @@
1.5.0
4.10.4
4.10.4
+ 1.2.1-SNAPSHOT
+ 1.0.0
auto
true
@@ -45,13 +47,6 @@
test
-
- com.google.apis
- google-api-services-gmail
- v1-rev23-1.19.1
- test
-
-
org.codehaus.groovy
groovy-all
@@ -109,20 +104,27 @@
test
-
-
- com.google.guava
- guava
- 18.0
- test
-
-
org.elasticsearch
elasticsearch
${elasticsearch.version}
+ provided
+
+
+
+ org.elasticsearch
+ elasticsearch-license-plugin
+ ${license.version}
+ true
+
+
+
+ org.elasticsearch
+ elasticsearch-shield
+ ${shield.version}
+ true
@@ -175,7 +177,61 @@
+
+
+
+
+ org.elasticsearch
+ elasticsearch-shield
+ ${shield.version}
+
+
+ org.elasticsearch
+ elasticsearch
+
+
+
+
+ org.elasticsearch
+ elasticsearch-license-plugin
+ ${license.version}
+
+
+ org.elasticsearch
+ elasticsearch
+
+
+ com.spatial4j
+ spatial4j
+
+
+
+
+
+
+
+ elasticsearch-releases
+ http://maven.elasticsearch.org/releases
+
+ true
+ daily
+
+
+ false
+
+
+
+ elasticsearch-snapshots
+ http://maven.elasticsearch.org/snapshots
+
+ false
+
+
+ true
+ always
+
+
maven2-repository.dev.java.net
Java.net Repository for Maven
@@ -183,6 +239,7 @@
default
+
diff --git a/src/main/java/org/elasticsearch/watcher/TransportClientWatcherModule.java b/src/main/java/org/elasticsearch/watcher/TransportClientWatcherModule.java
new file mode 100644
index 00000000000..62b025bb0d3
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/TransportClientWatcherModule.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher;
+
+
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.SpawnModules;
+import org.elasticsearch.watcher.transport.WatcherTransportModule;
+
+
+public class TransportClientWatcherModule extends AbstractModule implements SpawnModules {
+
+ @Override
+ public Iterable extends Module> spawnModules() {
+ return ImmutableList.of(new WatcherTransportModule());
+ }
+
+ @Override
+ protected void configure() {
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java
index bab547c0bee..5e6261dd034 100644
--- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java
+++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java
@@ -6,6 +6,11 @@
package org.elasticsearch.watcher;
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.SpawnModules;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.actions.ActionModule;
import org.elasticsearch.watcher.client.WatcherClientModule;
import org.elasticsearch.watcher.condition.ConditionModule;
@@ -13,21 +18,24 @@ import org.elasticsearch.watcher.history.HistoryModule;
import org.elasticsearch.watcher.input.InputModule;
import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.scheduler.SchedulerModule;
+import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.clock.ClockModule;
import org.elasticsearch.watcher.support.init.InitializingModule;
import org.elasticsearch.watcher.support.template.TemplateModule;
import org.elasticsearch.watcher.transform.TransformModule;
import org.elasticsearch.watcher.transport.WatcherTransportModule;
-import org.elasticsearch.common.collect.ImmutableList;
-import org.elasticsearch.common.inject.AbstractModule;
-import org.elasticsearch.common.inject.Module;
-import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.watcher.watch.WatchModule;
public class WatcherModule extends AbstractModule implements SpawnModules {
+ private final Settings settings;
+
+ public WatcherModule(Settings settings) {
+ this.settings = settings;
+ }
+
@Override
public Iterable extends Module> spawnModules() {
return ImmutableList.of(
@@ -43,7 +51,8 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
new ConditionModule(),
new InputModule(),
new ActionModule(),
- new HistoryModule());
+ new HistoryModule(),
+ new WatcherShieldModule(settings));
}
@Override
diff --git a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java
index 8c9babdf93f..3462b2af839 100644
--- a/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java
+++ b/src/main/java/org/elasticsearch/watcher/WatcherPlugin.java
@@ -5,6 +5,8 @@
*/
package org.elasticsearch.watcher;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.common.collect.ImmutableList;
@@ -24,9 +26,11 @@ public class WatcherPlugin extends AbstractPlugin {
public static final String SCHEDULER_THREAD_POOL_NAME = "watcher_scheduler";
private final Settings settings;
+ private final boolean transportClient;
public WatcherPlugin(Settings settings) {
this.settings = settings;
+ transportClient = "transport".equals(settings.get(Client.CLIENT_TYPE_SETTING));
}
@Override public String name() {
@@ -39,11 +43,16 @@ public class WatcherPlugin extends AbstractPlugin {
@Override
public Collection> modules() {
- return ImmutableList.>of(WatcherModule.class);
+ return transportClient ?
+ ImmutableList.>of(TransportClientWatcherModule.class) :
+ ImmutableList.>of(WatcherModule.class);
}
@Override
public Collection> services() {
+ if (transportClient) {
+ return ImmutableList.of();
+ }
return ImmutableList.>of(
// the initialization service must be first in the list
// as other services may depend on one of the initialized
@@ -54,6 +63,9 @@ public class WatcherPlugin extends AbstractPlugin {
@Override
public Settings additionalSettings() {
+ if (transportClient) {
+ return ImmutableSettings.EMPTY;
+ }
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
return settingsBuilder()
.put("threadpool." + SCHEDULER_THREAD_POOL_NAME + ".type", "fixed")
diff --git a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java
index 8d5af1a01c8..8e46cb8bbab 100644
--- a/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java
+++ b/src/main/java/org/elasticsearch/watcher/actions/index/IndexAction.java
@@ -71,7 +71,7 @@ public class IndexAction extends Action {
}
try {
- IndexResponse response = client.index(indexRequest).actionGet();
+ IndexResponse response = client.index(indexRequest);
Map data = new HashMap<>();
data.put("created", response.isCreated());
data.put("id", response.getId());
diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java
index 8e4a13a3e59..d5393e3a679 100644
--- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java
+++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java
@@ -60,10 +60,10 @@ public class HistoryStore extends AbstractComponent {
public void put(WatchRecord watchRecord) throws HistoryException {
String index = getHistoryIndexNameForTime(watchRecord.scheduledTime());
try {
- IndexResponse response = client.prepareIndex(index, DOC_TYPE, watchRecord.id())
- .setSource(XContentFactory.jsonBuilder().value(watchRecord))
- .setOpType(IndexRequest.OpType.CREATE)
- .get();
+ IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id())
+ .source(XContentFactory.jsonBuilder().value(watchRecord))
+ .opType(IndexRequest.OpType.CREATE);
+ IndexResponse response = client.index(request);
watchRecord.version(response.getVersion());
} catch (IOException e) {
throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e);
@@ -74,10 +74,10 @@ public class HistoryStore extends AbstractComponent {
logger.debug("updating watch record [{}]...", watchRecord);
try {
BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes();
- IndexResponse response = client.prepareIndex(getHistoryIndexNameForTime(watchRecord.scheduledTime()), DOC_TYPE, watchRecord.id())
- .setSource(bytes)
- .setVersion(watchRecord.version())
- .get();
+ IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.scheduledTime()), DOC_TYPE, watchRecord.id())
+ .source(bytes, true)
+ .version(watchRecord.version());
+ IndexResponse response = client.index(request);
watchRecord.version(response.getVersion());
logger.debug("successfully updated watch record [{}]", watchRecord);
} catch (IOException e) {
diff --git a/src/main/java/org/elasticsearch/watcher/shield/ShieldIntegration.java b/src/main/java/org/elasticsearch/watcher/shield/ShieldIntegration.java
new file mode 100644
index 00000000000..1b079b40a78
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/shield/ShieldIntegration.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.shield;
+
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.inject.Injector;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.shield.ShieldPlugin;
+import org.elasticsearch.shield.authc.AuthenticationService;
+import org.elasticsearch.transport.TransportMessage;
+
+/**
+ *
+ */
+public class ShieldIntegration {
+
+ private final boolean installed;
+ private final boolean enabled;
+ private final Object authcService;
+ private final Object userHolder;
+
+ @Inject
+ public ShieldIntegration(Settings settings, Injector injector) {
+ installed = installed(settings);
+ enabled = installed && ShieldPlugin.shieldEnabled(settings);
+ authcService = enabled ? injector.getInstance(AuthenticationService.class) : null;
+ userHolder = enabled ? injector.getInstance(WatcherUserHolder.class) : null;
+ }
+
+ public boolean installed() {
+ return installed;
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public void bindWatcherUser(TransportMessage message) {
+ if (authcService != null) {
+ ((AuthenticationService) authcService).attachUserHeaderIfMissing(message, ((WatcherUserHolder) userHolder).user);
+ }
+ }
+
+ static boolean installed(Settings settings) {
+ try {
+ Class clazz = settings.getClassLoader().loadClass("org.elasticsearch.shield.ShieldPlugin");
+ return clazz != null;
+ } catch (ClassNotFoundException e) {
+ return false;
+ }
+ }
+
+ static boolean enabled(Settings settings) {
+ return installed(settings) && ShieldPlugin.shieldEnabled(settings);
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/shield/WatcherShieldModule.java b/src/main/java/org/elasticsearch/watcher/shield/WatcherShieldModule.java
new file mode 100644
index 00000000000..40a1d13c1dc
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/shield/WatcherShieldModule.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.shield;
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.PreProcessModule;
+import org.elasticsearch.common.inject.util.Providers;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.shield.authz.AuthorizationModule;
+import org.elasticsearch.shield.authz.Privilege;
+
+/**
+ *
+ */
+public class WatcherShieldModule extends AbstractModule implements PreProcessModule {
+
+ private final ESLogger logger;
+
+ private final boolean enabled;
+
+ private final WatcherUserHolder userHolder;
+
+ public WatcherShieldModule(Settings settings) {
+ this.logger = Loggers.getLogger(WatcherShieldModule.class, settings);
+ this.enabled = ShieldIntegration.enabled(settings);
+ if (enabled) {
+ userHolder = new WatcherUserHolder();
+ registerClusterPrivilege("manage_watcher", "cluster:admin/watcher/*", "cluster:monitor/watcher/*");
+ registerClusterPrivilege("monitor_watcher", "cluster:monitor/watcher/*");
+ } else {
+ userHolder = null;
+ }
+ }
+
+ void registerClusterPrivilege(String name, String... patterns) {
+ try {
+ Privilege.Cluster.addCustom(name, patterns);
+ } catch (Exception se) {
+ logger.warn("could not register cluster privilege [{}]", name);
+
+ // we need to prevent bubbling the shield exception here for the tests. In the tests
+ // we create multiple nodes in the same jvm and since the custom cluster is a static binding
+ // multiple nodes will try to add the same privileges multiple times.
+ }
+ }
+
+ @Override
+ public void processModule(Module module) {
+ if (enabled && module instanceof AuthorizationModule) {
+ ((AuthorizationModule) module).registerReservedRole(userHolder.role);
+ }
+ }
+
+ @Override
+ protected void configure() {
+ bind(ShieldIntegration.class).asEagerSingleton();
+ bind(WatcherUserHolder.class).toProvider(Providers.of(userHolder));
+ }
+}
diff --git a/src/main/java/org/elasticsearch/watcher/shield/WatcherUserHolder.java b/src/main/java/org/elasticsearch/watcher/shield/WatcherUserHolder.java
new file mode 100644
index 00000000000..5ce1773cd8f
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/shield/WatcherUserHolder.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.shield;
+
+import org.elasticsearch.shield.User;
+import org.elasticsearch.shield.authz.Permission;
+import org.elasticsearch.shield.authz.Privilege;
+
+/**
+ *
+ */
+public class WatcherUserHolder {
+
+ static final String NAME = "__watcher_user";
+ static final String[] ROLE_NAMES = new String[] { "__watcher_role" };
+
+ final Permission.Global.Role role = Permission.Global.Role.builder(ROLE_NAMES[0])
+ .set(Privilege.Cluster.action("indices:admin/template/put"))
+
+ // for now, the watches will be executed under the watcher user, meaning, all actions
+ // taken as part of the execution will be executed on behalf of this user. this includes
+ // the index action, search input and search transform. For this reason the watcher user
+ // requires full access to all indices in the cluster.
+ //
+ // at later phases we'll want to execute the watch on behalf of the user who registers
+ // it. this will require some work to attache/persist that user to/with the watch.
+ .add(Privilege.Index.ALL, "*")
+
+ .build();
+
+ final User user = new User.Simple(NAME, ROLE_NAMES);
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java
index 6f1b80af057..f4e3e49f3da 100644
--- a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java
+++ b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java
@@ -9,14 +9,15 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
-import org.elasticsearch.common.base.Charsets;
-import org.elasticsearch.watcher.watch.WatchStore;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
+import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.watcher.shield.ShieldIntegration;
+import org.elasticsearch.watcher.watch.WatchStore;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -31,12 +32,14 @@ public class TemplateUtils extends AbstractComponent {
private final static Pattern TEMPLATE_VERSION_PATTERN = Pattern.compile("watcher.template_version\"\\s*:\\s*\"?(\\d+)\"?");
- private final TransportPutIndexTemplateAction transportPutIndexTemplateAction;
+ private final ShieldIntegration shieldIntegration;
+ private final TransportPutIndexTemplateAction action;
@Inject
- public TemplateUtils(Settings settings, TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
+ public TemplateUtils(Settings settings, TransportPutIndexTemplateAction action, ShieldIntegration shieldIntegration) {
super(settings);
- this.transportPutIndexTemplateAction = transportPutIndexTemplateAction;
+ this.action = action;
+ this.shieldIntegration = shieldIntegration;
}
/**
@@ -83,8 +86,10 @@ public class TemplateUtils extends AbstractComponent {
}
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
+ shieldIntegration.bindWatcherUser(request);
+
// We're already running on the master and TransportPutIndexTemplateAction#executor() is SAME, so it is ok to wait:
- ActionFuture future = transportPutIndexTemplateAction.execute(request);
+ ActionFuture future = action.execute(request);
PutIndexTemplateResponse response = future.actionGet();
} catch (IOException e) {
// if we're not sure of the template, we can't send data... re-raise exception.
diff --git a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java
index 5ee6d384f39..c0583364a4f 100644
--- a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java
+++ b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java
@@ -6,21 +6,21 @@
package org.elasticsearch.watcher.support.init.proxy;
import org.elasticsearch.action.ActionFuture;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
-import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.TransportMessage;
+import org.elasticsearch.watcher.shield.ShieldIntegration;
+import org.elasticsearch.watcher.support.init.InitializingService;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
@@ -28,13 +28,19 @@ import org.elasticsearch.common.unit.TimeValue;
*/
public class ClientProxy implements InitializingService.Initializable {
+ private final ShieldIntegration shieldIntegration;
private Client client;
+ @Inject
+ public ClientProxy(ShieldIntegration shieldIntegration) {
+ this.shieldIntegration = shieldIntegration;
+ }
+
/**
* Creates a proxy to the given client (can be used for testing)
*/
public static ClientProxy of(Client client) {
- ClientProxy proxy = new ClientProxy();
+ ClientProxy proxy = new ClientProxy(null);
proxy.client = client;
return proxy;
}
@@ -48,43 +54,38 @@ public class ClientProxy implements InitializingService.Initializable {
return client.admin();
}
- public ActionFuture index(IndexRequest request) {
- return client.index(request);
- }
-
- public void index(IndexRequest request, ActionListener listener) {
- client.index(request, listener);
- }
-
- public IndexRequestBuilder prepareIndex(String index, String type, String id) {
- return client.prepareIndex(index, type, id);
+ public IndexResponse index(IndexRequest request) {
+ return client.index(preProcess(request)).actionGet();
}
public ActionFuture delete(DeleteRequest request) {
- return client.delete(request);
- }
-
- public GetRequestBuilder prepareGet(String index, String type, String id) {
- return client.prepareGet(index, type, id);
+ return client.delete(preProcess(request));
}
public SearchResponse search(SearchRequest request) {
- return client.search(request).actionGet();
+ return client.search(preProcess(request)).actionGet();
}
public SearchResponse searchScroll(String scrollId, TimeValue timeout) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout);
- return client.searchScroll(request).actionGet();
+ return client.searchScroll(preProcess(request)).actionGet();
}
public ClearScrollResponse clearScroll(String scrollId) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
- return client.clearScroll(request).actionGet();
+ return client.clearScroll(preProcess(request)).actionGet();
}
public RefreshResponse refresh(RefreshRequest request) {
- return client.admin().indices().refresh(request).actionGet();
+ return client.admin().indices().refresh(preProcess(request)).actionGet();
+ }
+
+ M preProcess(M message) {
+ if (shieldIntegration != null) {
+ shieldIntegration.bindWatcherUser(message);
+ }
+ return message;
}
}
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchAction.java
index e314d8bacd2..51edde52d02 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/ack/AckWatchAction.java
@@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class AckWatchAction extends WatcherAction {
public static final AckWatchAction INSTANCE = new AckWatchAction();
- public static final String NAME = "indices:data/write/watch/ack";
+ public static final String NAME = "cluster:admin/watcher/watch/ack";
private AckWatchAction() {
super(NAME);
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchAction.java
index cb0eafd55cd..a943138b951 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchAction.java
@@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class DeleteWatchAction extends WatcherAction {
public static final DeleteWatchAction INSTANCE = new DeleteWatchAction();
- public static final String NAME = "indices:data/write/watch/delete";
+ public static final String NAME = "cluster:admin/watcher/watch/delete";
private DeleteWatchAction() {
super(NAME);
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchAction.java
index 3be38bb4ec9..499ea0bf9fa 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/get/GetWatchAction.java
@@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class GetWatchAction extends WatcherAction {
public static final GetWatchAction INSTANCE = new GetWatchAction();
- public static final String NAME = "indices:data/read/watch/get";
+ public static final String NAME = "cluster:monitor/watcher/watch/get";
private GetWatchAction() {
super(NAME);
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchAction.java
index 423a50fa2ef..d8f68b0fcbd 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/put/PutWatchAction.java
@@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class PutWatchAction extends WatcherAction {
public static final PutWatchAction INSTANCE = new PutWatchAction();
- public static final String NAME = "indices:data/write/watch/put";
+ public static final String NAME = "cluster:admin/watcher/watch/put";
private PutWatchAction() {
super(NAME);
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/service/TransportWatcherServiceAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/service/TransportWatcherServiceAction.java
index 09a9734aaf1..af0b96b30fc 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/service/TransportWatcherServiceAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/service/TransportWatcherServiceAction.java
@@ -27,8 +27,8 @@ public class TransportWatcherServiceAction extends TransportMasterNodeOperationA
private final WatcherLifeCycleService lifeCycleService;
@Inject
- public TransportWatcherServiceAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, WatcherLifeCycleService lifeCycleService) {
- super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
+ public TransportWatcherServiceAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, WatcherLifeCycleService lifeCycleService) {
+ super(settings, WatcherServiceAction.NAME, transportService, clusterService, threadPool, actionFilters);
this.lifeCycleService = lifeCycleService;
}
diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsAction.java
index f5f1ccf1eec..1c5e3ab881a 100644
--- a/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsAction.java
+++ b/src/main/java/org/elasticsearch/watcher/transport/actions/stats/WatcherStatsAction.java
@@ -14,7 +14,7 @@ import org.elasticsearch.client.Client;
public class WatcherStatsAction extends WatcherAction {
public static final WatcherStatsAction INSTANCE = new WatcherStatsAction();
- public static final String NAME = "cluster/watcher/stats";
+ public static final String NAME = "cluster:monitor/watcher/stats";
private WatcherStatsAction() {
super(NAME);
diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java
index 12137fb3a2c..7ebe3a0a7f0 100644
--- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java
+++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java
@@ -137,7 +137,7 @@ public class WatchStore extends AbstractComponent {
ensureStarted();
Watch watch = watchParser.parse(name, false, source);
IndexRequest indexRequest = createIndexRequest(name, source);
- IndexResponse response = client.index(indexRequest).actionGet();
+ IndexResponse response = client.index(indexRequest);
watch.status().version(response.getVersion());
Watch previous = watches.put(name, watch);
return new WatchPut(previous, watch, response);
@@ -160,7 +160,7 @@ public class WatchStore extends AbstractComponent {
ensureStarted();
assert watch == watches.get(watch.name()) : "update watch can only be applied to an already loaded watch";
BytesReference source = JsonXContent.contentBuilder().value(watch).bytes();
- IndexResponse response = client.index(createIndexRequest(watch.name(), source)).actionGet();
+ IndexResponse response = client.index(createIndexRequest(watch.name(), source));
watch.status().version(response.getVersion());
// Don't need to update the watches, since we are working on an instance from it.
}
diff --git a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java
index 21b6d468e09..00db34399f0 100644
--- a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java
+++ b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java
@@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher;
-import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@@ -13,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchTestCase;
@@ -37,7 +37,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
@Before
public void prepareServices() {
threadPool = mock(ThreadPool.class);
- when(threadPool.executor(anyString())).thenReturn(MoreExecutors.sameThreadExecutor());
+ when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watchService = mock(WatchService.class);
clusterService = mock(ClusterService.class);
indicesService = mock(IndicesService.class);
diff --git a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java
index 51f2ff9abf5..16c7d61c6bf 100644
--- a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java
+++ b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java
@@ -7,16 +7,11 @@ package org.elasticsearch.watcher.condition.script;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.node.settings.NodeSettingsService;
-import org.elasticsearch.watcher.watch.WatchExecutionContext;
-import org.elasticsearch.watcher.watch.Payload;
-import org.elasticsearch.watcher.support.Script;
-import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
-import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
+import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
@@ -29,6 +24,11 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
+import org.elasticsearch.watcher.support.Script;
+import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
+import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +42,7 @@ import static org.mockito.Mockito.when;
/**
*/
-public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests {
+public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTests {
private ThreadPool tp = null;
private ScriptServiceProxy scriptService;
@@ -67,8 +67,10 @@ public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests {
@Test
public void testExecute_withAggs() throws Exception {
- createIndex("my-index", client().admin().indices().prepareCreate("my-index")
- .addMapping("my-type", "_timestamp", "enabled=true"));
+
+ client().admin().indices().prepareCreate("my-index")
+ .addMapping("my-type", "_timestamp", "enabled=true")
+ .get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:00").setSource("{}").get();
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:10").setSource("{}").get();
diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java
index fbc43975c19..b275183e473 100644
--- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java
+++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreLifeCycleTest.java
@@ -12,7 +12,7 @@ import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.simple.AlwaysTrueCondition;
import org.elasticsearch.watcher.support.clock.SystemClock;
-import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
+import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test;
@@ -22,12 +22,12 @@ import static org.hamcrest.Matchers.*;
/**
*/
-public class HistoryStoreLifeCycleTest extends AbstractWatcherSingleNodeTests {
+public class HistoryStoreLifeCycleTest extends AbstractWatcherIntegrationTests {
@Test
public void testPutLoadUpdate() throws Exception {
Condition condition = new AlwaysTrueCondition(logger);
- HistoryStore historyStore = getInstanceFromNode(HistoryStore.class);
+ HistoryStore historyStore = getInstanceFromMaster(HistoryStore.class);
Watch watch = new Watch("_name", SystemClock.INSTANCE, null, null, condition, null, null, null, null, null);
// Put watch records and verify that these are stored
@@ -43,7 +43,7 @@ public class HistoryStoreLifeCycleTest extends AbstractWatcherSingleNodeTests {
}
// Load the stored watch records
- ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+ ClusterService clusterService = getInstanceFromMaster(ClusterService.class);
Collection records = historyStore.loadRecords(clusterService.state(), WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, notNullValue());
assertThat(records, hasSize(watchRecords.length));
diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java
index 5a67ff0d916..9d96242dd09 100644
--- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java
+++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java
@@ -8,7 +8,6 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
@@ -26,7 +25,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit;
@@ -43,6 +41,7 @@ import org.junit.Test;
import java.util.Collection;
+import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsEqual.equalTo;
@@ -76,21 +75,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.metadata()).thenReturn(null);
WatchRecord watchRecord = new WatchRecord(watch, new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
- IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
- when(builder.setSource(any(XContentBuilder.class))).thenReturn(builder);
- when(builder.setOpType(IndexRequest.OpType.CREATE)).thenReturn(builder);
IndexResponse indexResponse = mock(IndexResponse.class);
long version = randomLong();
when(indexResponse.getVersion()).thenReturn(version);
- when(builder.get()).thenReturn(indexResponse);
- when(clientProxy.prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z")).thenReturn(builder);
+ when(clientProxy.index(indexRequest(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z", IndexRequest.OpType.CREATE))).thenReturn(indexResponse);
historyStore.put(watchRecord);
assertThat(watchRecord.version(), equalTo(version));
-
- verify(builder, times(1)).setSource(any(XContentBuilder.class));
- verify(builder, times(1)).setOpType(IndexRequest.OpType.CREATE);
- verify(clientProxy, times(1)).prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z");
}
@Test
@@ -103,21 +94,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
WatchRecord watchRecord = new WatchRecord(watch, new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
watchRecord.version(4l);
- IndexRequestBuilder builder = mock(IndexRequestBuilder.class);
- when(builder.setSource(any(BytesReference.class))).thenReturn(builder);
- when(builder.setVersion(4l)).thenReturn(builder);
IndexResponse indexResponse = mock(IndexResponse.class);
long version = randomLong();
when(indexResponse.getVersion()).thenReturn(version);
- when(builder.get()).thenReturn(indexResponse);
- when(clientProxy.prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z")).thenReturn(builder);
+ when(clientProxy.index(indexRequest(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z", 4L, null))).thenReturn(indexResponse);
historyStore.update(watchRecord);
assertThat(watchRecord.version(), equalTo(version));
-
- verify(builder, times(1)).setSource(any(BytesReference.class));
- verify(builder, times(1)).setVersion(4l);
- verify(clientProxy, times(1)).prepareIndex(".watch_history_1970-01-01", HistoryStore.DOC_TYPE, "_name#1970-01-01T00:00:00.000Z");
}
@Test
diff --git a/src/test/java/org/elasticsearch/watcher/scheduler/schedule/ScheduleTestCase.java b/src/test/java/org/elasticsearch/watcher/scheduler/schedule/ScheduleTestCase.java
index fc4f226126a..b217a1a4c2b 100644
--- a/src/test/java/org/elasticsearch/watcher/scheduler/schedule/ScheduleTestCase.java
+++ b/src/test/java/org/elasticsearch/watcher/scheduler/schedule/ScheduleTestCase.java
@@ -5,7 +5,7 @@
*/
package org.elasticsearch.watcher.scheduler.schedule;
-import com.google.common.primitives.Ints;
+import org.elasticsearch.common.primitives.Ints;
import org.elasticsearch.watcher.scheduler.schedule.IntervalSchedule.Interval.Unit;
import org.elasticsearch.watcher.scheduler.schedule.support.*;
import org.elasticsearch.common.xcontent.ToXContent;
diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
index 43d6ff7f252..6e2646dc30e 100644
--- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
@@ -5,17 +5,22 @@
*/
package org.elasticsearch.watcher.test;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.netty.util.internal.SystemPropertyUtil;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@@ -23,7 +28,10 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.shield.ShieldPlugin;
+import org.elasticsearch.shield.authc.esusers.ESUsersRealm;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
@@ -51,8 +59,10 @@ import org.elasticsearch.watcher.watch.WatchService;
import org.junit.After;
import org.junit.Before;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.file.Path;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -72,13 +82,21 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
private TimeWarp timeWarp;
+ boolean shieldEnabled = shieldEnabled();
+ private TransportClient shieldWatcherTransportClient;
+ private WatcherClient shieldWatcherClient;
+
@Override
protected Settings nodeSettings(int nodeOrdinal) {
- return ImmutableSettings.builder()
+ ImmutableSettings.Builder builder = ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
- .put("plugin.types", timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName())
- .build();
+ .put("plugin.types",
+ (timeWarped() ? TimeWarpedWatcherPlugin.class.getName() : WatcherPlugin.class.getName()) + "," +
+ (shieldEnabled ? ShieldPlugin.class.getName() + "," : "") +
+ LicensePlugin.class.getName())
+ .put(ShieldSettings.settings(shieldEnabled));
+ return builder.build();
}
/**
@@ -91,8 +109,46 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return timeWarpEnabled;
}
+ /**
+ * Override and returns {@code false} to force running without shield
+ */
+ protected boolean shieldEnabled() {
+ return randomBoolean();
+ }
+
@Before
- public void setupTimeWarp() throws Exception {
+ public void _setup() throws Exception {
+ setupTimeWarp();
+ startWatcherIfNodesExist();
+ }
+
+ @After
+ public void _cleanup() throws Exception {
+ // Clear all internal watcher state for the next test method:
+ logger.info("[{}#{}]: clearing watches", getTestClass().getSimpleName(), getTestName());
+ stopWatcher();
+ if (shieldWatcherTransportClient != null) {
+ shieldWatcherTransportClient.close();
+ }
+ }
+
+ @Override
+ protected Settings transportClientSettings() {
+ if (!shieldEnabled) {
+ return ImmutableSettings.builder()
+ .put(super.transportClientSettings())
+ .put("plugin.types", WatcherPlugin.class.getName())
+ .build();
+ }
+
+ return ImmutableSettings.builder()
+ .put("client.transport.sniff", false)
+ .put("plugin.types", ShieldPlugin.class.getName() + "," + WatcherPlugin.class.getName())
+ .put("shield.user", "admin:changeme")
+ .build();
+ }
+
+ private void setupTimeWarp() throws Exception {
if (timeWarped()) {
timeWarp = new TimeWarp(
internalTestCluster().getInstance(SchedulerMock.class, internalTestCluster().getMasterName()),
@@ -100,6 +156,20 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
}
+ private void startWatcherIfNodesExist() throws Exception {
+ if (internalTestCluster().size() > 0) {
+ WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
+ if (response.getWatchServiceState() == WatchService.State.STOPPED) {
+ logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
+ startWatcher();
+ } else {
+ logger.info("[{}#{}]: not starting watcher, because watcher is in state [{}]", getTestClass().getSimpleName(), getTestName(), response.getWatchServiceState());
+ }
+ } else {
+ logger.info("[{}#{}]: not starting watcher, because test cluster has no nodes", getTestClass().getSimpleName(), getTestName());
+ }
+ }
+
protected TimeWarp timeWarp() {
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
return timeWarp;
@@ -117,28 +187,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return new WatcherWrappingCluster(seed, testCluster);
}
- @Before
- public void startWatcherIfNodesExist() throws Exception {
- if (internalTestCluster().size() > 0) {
- WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
- if (response.getWatchServiceState() == WatchService.State.STOPPED) {
- logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
- startWatcher();
- } else {
- logger.info("[{}#{}]: not starting watcher, because watcher is in state [{}]", getTestClass().getSimpleName(), getTestName(), response.getWatchServiceState());
- }
- } else {
- logger.info("[{}#{}]: not starting watcher, because test cluster has no nodes", getTestClass().getSimpleName(), getTestName());
- }
- }
-
- @After
- public void clearWatches() throws Exception {
- // Clear all internal watcher state for the next test method:
- logger.info("[{}#{}]: clearing watches", getTestClass().getSimpleName(), getTestName());
- stopWatcher();
- }
-
protected long docCount(String index, String type, QueryBuilder query) {
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
}
@@ -210,16 +258,30 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return builder.bytes();
}
+ protected T getInstanceFromMaster(Class type) {
+ return internalTestCluster().getInstance(type, internalTestCluster().getMasterName());
+ }
+
protected Watch.Parser watchParser() {
- return internalTestCluster().getInstance(Watch.Parser.class, internalTestCluster().getMasterName());
+ return getInstanceFromMaster(Watch.Parser.class);
}
protected Scheduler scheduler() {
- return internalTestCluster().getInstance(Scheduler.class, internalTestCluster().getMasterName());
+ return getInstanceFromMaster(Scheduler.class);
}
protected WatcherClient watcherClient() {
- return internalTestCluster().getInstance(WatcherClient.class);
+ return shieldEnabled ?
+ new WatcherClient(internalTestCluster().transportClient()) :
+ new WatcherClient(client());
+// if (shieldEnabled) {
+// if (shieldWatcherClient == null) {
+// shieldWatcherClient = createShieldWatcherClient();
+// }
+// return shieldWatcherClient;
+// }
+// WatcherClient client = internalTestCluster().getInstance(WatcherClient.class);
+// return client;
}
protected ScriptServiceProxy scriptService() {
@@ -476,4 +538,83 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
}
+
+ /** Shield related settings */
+
+ static class ShieldSettings {
+
+ public static final String IP_FILTER = "allow: all\n";
+
+ public static final String USERS =
+ "test:{plain}changeme\n" +
+ "admin:{plain}changeme\n" +
+ "monitor:{plain}changeme";
+
+ public static final String USER_ROLES =
+ "test:test\n" +
+ "admin:admin\n" +
+ "monitor:monitor";
+
+ public static final String ROLES =
+ "test:\n" + // a user for the test infra.
+ " cluster: all\n" +
+ " indices:\n" +
+ " '*': all\n" +
+ "\n" +
+ "admin:\n" +
+ " cluster: manage_watcher, cluster:monitor/nodes/info\n" +
+ "\n" +
+ "monitor:\n" +
+ " cluster: monitor_watcher, cluster:monitor/nodes/info\n"
+ ;
+
+ static Settings settings(boolean enabled) {
+ ImmutableSettings.Builder builder = ImmutableSettings.builder();
+ if (!enabled) {
+ return builder.put("shield.enabled", false).build();
+ }
+
+ File folder = createFolder(globalTempDir(), "watcher_shield");
+ return builder.put("shield.enabled", true)
+ .put("shield.user", "test:changeme")
+ .put("shield.authc.realms.esusers.type", ESUsersRealm.TYPE)
+ .put("shield.authc.realms.esusers.order", 0)
+ .put("shield.authc.realms.esusers.files.users", writeFile(folder, "users", USERS))
+ .put("shield.authc.realms.esusers.files.users_roles", writeFile(folder, "users_roles", USER_ROLES))
+ .put("shield.authz.store.files.roles", writeFile(folder, "roles.yml", ROLES))
+ .put("shield.transport.n2n.ip_filter.file", writeFile(folder, "ip_filter.yml", IP_FILTER))
+ .put("shield.audit.enabled", true)
+ .build();
+ }
+
+ static File createFolder(File parent, String name) {
+ File createdFolder = new File(parent, name);
+ //the directory might exist e.g. if the global cluster gets restarted, then we recreate the directory as well
+ if (createdFolder.exists()) {
+ if (!FileSystemUtils.deleteRecursively(createdFolder)) {
+ throw new RuntimeException("could not delete existing temporary folder: " + createdFolder.getAbsolutePath());
+ }
+ }
+ if (!createdFolder.mkdir()) {
+ throw new RuntimeException("could not create temporary folder: " + createdFolder.getAbsolutePath());
+ }
+ return createdFolder;
+ }
+
+ static String writeFile(File folder, String name, String content) {
+ return writeFile(folder, name, content.getBytes(Charsets.UTF_8));
+ }
+
+ static String writeFile(File folder, String name, byte[] content) {
+ Path file = folder.toPath().resolve(name);
+ try {
+ Streams.copy(content, file.toFile());
+ } catch (IOException e) {
+ throw new ElasticsearchException("error writing file in test", e);
+ }
+ return file.toFile().getAbsolutePath();
+ }
+ }
+
+
}
diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java
deleted file mode 100644
index 1b3e63a44ba..00000000000
--- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.watcher.test;
-
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.test.ElasticsearchSingleNodeTest;
-import org.elasticsearch.watcher.WatcherLifeCycleService;
-import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
-import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.util.Collections;
-import java.util.Map;
-
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
-
-/**
- *
- */
-public abstract class AbstractWatcherSingleNodeTests extends ElasticsearchSingleNodeTest {
-
- @BeforeClass
- public static void initSuite() throws Exception {
- getInstanceFromNode(WatcherLifeCycleService.class).start();
- }
-
- @AfterClass
- public static void cleanupSuite() throws Exception {
- getInstanceFromNode(WatcherLifeCycleService.class).stop();
- }
-
- @Override
- protected boolean resetNodeAfterTest() {
- return false;
- }
-
- protected IndexResponse index(String index, String type, String id) {
- return index(index, type, id, Collections.emptyMap());
- }
-
- protected IndexResponse index(String index, String type, String id, Map doc) {
- return client().prepareIndex(index, type, id).setSource(doc).get();
- }
-
- protected RefreshResponse refresh() {
- RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet();
- assertNoFailures(actionGet);
- return actionGet;
- }
-
- protected ClientProxy clientProxy() {
- return ClientProxy.of(client());
- }
-
- protected ScriptServiceProxy scriptService() {
- return getInstanceFromNode(ScriptServiceProxy.class);
- }
-}
diff --git a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java
index 4cc14eade8f..479a1cb2a37 100644
--- a/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java
+++ b/src/test/java/org/elasticsearch/watcher/test/TimeWarpedWatcherPlugin.java
@@ -45,6 +45,10 @@ public class TimeWarpedWatcherPlugin extends WatcherPlugin {
*/
public static class WatcherModule extends org.elasticsearch.watcher.WatcherModule {
+ public WatcherModule(Settings settings) {
+ super(settings);
+ }
+
@Override
public Iterable extends Module> spawnModules() {
List modules = new ArrayList<>();
diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherMatchers.java b/src/test/java/org/elasticsearch/watcher/test/WatcherMatchers.java
new file mode 100644
index 00000000000..63e8abd8263
--- /dev/null
+++ b/src/test/java/org/elasticsearch/watcher/test/WatcherMatchers.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.test;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.mockito.Matchers;
+
+/**
+ *
+ */
+public final class WatcherMatchers {
+
+ private WatcherMatchers() {
+ }
+
+ public static IndexRequest indexRequest(String index, String type, String id) {
+ return Matchers.argThat(indexRequestMatcher(index, type, id));
+ }
+
+ public static IndexRequest indexRequest(String index, String type, String id, IndexRequest.OpType opType) {
+ return Matchers.argThat(indexRequestMatcher(index, type, id).opType(opType));
+ }
+
+ public static IndexRequest indexRequest(String index, String type, String id, Long version, IndexRequest.OpType opType) {
+ return Matchers.argThat(indexRequestMatcher(index, type, id).version(version).opType(opType));
+ }
+
+ public static IndexRequestMatcher indexRequestMatcher(String index, String type, String id) {
+ return new IndexRequestMatcher(index, type, id);
+ }
+
+ public static class IndexRequestMatcher extends TypeSafeMatcher {
+
+ private final String index;
+ private final String type;
+ private final String id;
+ private Long version;
+ private IndexRequest.OpType opType;
+
+ private IndexRequestMatcher(String index, String type, String id) {
+ this.index = index;
+ this.type = type;
+ this.id = id;
+ }
+
+ public IndexRequestMatcher version(long version) {
+ this.version = version;
+ return this;
+ }
+
+ public IndexRequestMatcher opType(IndexRequest.OpType opType) {
+ this.opType = opType;
+ return this;
+ }
+
+ @Override
+ protected boolean matchesSafely(IndexRequest request) {
+ if (!index.equals(request.index()) || !type.equals(request.type()) || !id.equals(request.id())) {
+ return false;
+ }
+ if (version != null && !version.equals(request.version())) {
+ return false;
+ }
+ if (opType != null && !opType.equals(request.opType())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("is index request [" + index + "/" + type + "/" + id + "]");
+ }
+ }
+}
diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
index 41232eecc54..468d1ee755b 100644
--- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
+++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
@@ -5,11 +5,18 @@
*/
package org.elasticsearch.watcher.test;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.watcher.watch.Watch;
-import org.elasticsearch.watcher.watch.WatchExecutionContext;
-import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.joda.time.DateTime;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Actions;
import org.elasticsearch.watcher.actions.email.EmailAction;
@@ -22,23 +29,19 @@ import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.scheduler.schedule.CronSchedule;
-import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.Script;
+import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transform.SearchTransform;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.collect.ImmutableMap;
-import org.elasticsearch.common.joda.time.DateTime;
-import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.Watch;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
import javax.mail.internet.AddressException;
import java.util.ArrayList;
@@ -149,4 +152,5 @@ public final class WatcherTestUtils {
new TimeValue(0),
new Watch.Status());
}
+
}
diff --git a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java
index 266c22ef4d5..0f93e8e7056 100644
--- a/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java
+++ b/src/test/java/org/elasticsearch/watcher/test/bench/WatcherBenchmark.java
@@ -102,6 +102,10 @@ public class WatcherBenchmark {
public static class WatcherModule extends org.elasticsearch.watcher.WatcherModule {
+ public WatcherModule(Settings settings) {
+ super(settings);
+ }
+
@Override
public Iterable extends Module> spawnModules() {
List modules = new ArrayList<>();
diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java
index 45af82b0a9b..66e80625e90 100644
--- a/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java
+++ b/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java
@@ -6,8 +6,11 @@
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
-import org.elasticsearch.watcher.watch.WatchStore;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.scheduler.schedule.IntervalSchedule;
@@ -16,12 +19,15 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
+import static org.elasticsearch.index.query.QueryBuilders.*;
+import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
@@ -30,12 +36,6 @@ import static org.elasticsearch.watcher.scheduler.schedule.Schedules.cron;
import static org.elasticsearch.watcher.scheduler.schedule.Schedules.interval;
import static org.elasticsearch.watcher.support.Variables.*;
import static org.elasticsearch.watcher.test.WatcherTestUtils.newInputSearchRequest;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
-import static org.elasticsearch.index.query.QueryBuilders.*;
-import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
/**
diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java
index f65259471f0..a40751a9cf6 100644
--- a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java
+++ b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java
@@ -50,6 +50,11 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
return false;
}
+ @Override
+ protected boolean shieldEnabled() {
+ return false;
+ }
+
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings settings = super.nodeSettings(nodeOrdinal);
diff --git a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java
index 93424d1a3f6..388fb3fb6d5 100644
--- a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java
+++ b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java
@@ -8,11 +8,6 @@ package org.elasticsearch.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.watcher.watch.WatchExecutionContext;
-import org.elasticsearch.watcher.watch.Payload;
-import org.elasticsearch.watcher.support.Variables;
-import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
-import org.elasticsearch.watcher.test.AbstractWatcherSingleNodeTests;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
@@ -20,24 +15,29 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.watcher.support.Variables;
+import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
+import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
-import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
+import static org.elasticsearch.watcher.support.WatcherDateUtils.parseDate;
+import static org.elasticsearch.watcher.test.WatcherTestUtils.*;
import static org.hamcrest.Matchers.*;
/**
*
*/
-public class SearchTransformTests extends AbstractWatcherSingleNodeTests {
+public class SearchTransformTests extends AbstractWatcherIntegrationTests {
@Test
public void testApply() throws Exception {