Renamed `WatchService` to `WatcherService` and moved it to the `org.elasticsearch.watcher` package.

Original commit: elastic/x-pack-elasticsearch@5f602ed832
This commit is contained in:
Martijn van Groningen 2015-04-23 13:41:11 +02:00
parent ec8c9046b3
commit 4e8ed283e7
16 changed files with 113 additions and 112 deletions

View File

@ -16,25 +16,24 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.watch.WatchService;
/**
*/
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
private final ThreadPool threadPool;
private final WatchService watchService;
private final WatcherService watcherService;
private final ClusterService clusterService;
// Maybe this should be a setting in the cluster settings?
private volatile boolean manuallyStopped;
@Inject
public WatcherLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, WatchService watchService) {
public WatcherLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, WatcherService watcherService) {
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.watchService = watchService;
this.watcherService = watcherService;
clusterService.add(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled.
@ -56,12 +55,12 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
private synchronized void start(ClusterState state) {
watchService.start(state);
watcherService.start(state);
}
private synchronized void stop(boolean manual) {
manuallyStopped = manual;
watchService.stop();
watcherService.stop();
}
@Override
@ -85,11 +84,11 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
final ClusterState state = event.state();
if (!watchService.validate(state)) {
if (!watcherService.validate(state)) {
return;
}
if (watchService.state() == WatchService.State.STOPPED && !manuallyStopped) {
if (watcherService.state() == WatcherService.State.STOPPED && !manuallyStopped) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.watch;
package org.elasticsearch.watcher;
import org.elasticsearch.ElasticsearchIllegalStateException;
@ -13,15 +13,17 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchLockService;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
public class WatchService extends AbstractComponent {
public class WatcherService extends AbstractComponent {
private final TriggerService triggerService;
private final WatchStore watchStore;
@ -30,8 +32,8 @@ public class WatchService extends AbstractComponent {
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@Inject
public WatchService(Settings settings, TriggerService triggerService, WatchStore watchStore, ExecutionService executionService,
WatchLockService watchLockService) {
public WatcherService(Settings settings, TriggerService triggerService, WatchStore watchStore, ExecutionService executionService,
WatchLockService watchLockService) {
super(settings);
this.triggerService = triggerService;
this.watchStore = watchStore;

View File

@ -18,7 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
/**
@ -26,13 +26,13 @@ import org.elasticsearch.watcher.watch.WatchStore;
*/
public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequest, AckWatchResponse> {
private final WatchService watchService;
private final WatcherService watcherService;
@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatchService watchService, LicenseService licenseService) {
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, AckWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.watchService = watchService;
this.watcherService = watcherService;
}
@Override
@ -53,7 +53,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
@Override
protected void masterOperation(AckWatchRequest request, ClusterState state, ActionListener<AckWatchResponse> listener) throws ElasticsearchException {
try {
AckWatchResponse response = new AckWatchResponse(watchService.ackWatch(request.getId()));
AckWatchResponse response = new AckWatchResponse(watcherService.ackWatch(request.getId()));
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);

View File

@ -19,7 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
/**
@ -27,13 +27,13 @@ import org.elasticsearch.watcher.watch.WatchStore;
*/
public class TransportDeleteWatchAction extends WatcherTransportAction<DeleteWatchRequest, DeleteWatchResponse> {
private final WatchService watchService;
private final WatcherService watcherService;
@Inject
public TransportDeleteWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatchService watchService, LicenseService licenseService) {
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, DeleteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.watchService = watchService;
this.watcherService = watcherService;
}
@Override
@ -54,7 +54,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction<DeleteWat
@Override
protected void masterOperation(DeleteWatchRequest request, ClusterState state, ActionListener<DeleteWatchResponse> listener) throws ElasticsearchException {
try {
DeleteResponse deleteResponse = watchService.deleteWatch(request.getId()).deleteResponse();
DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId()).deleteResponse();
DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.isFound());
listener.onResponse(response);
} catch (Exception e) {

View File

@ -22,7 +22,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.IOException;
@ -32,13 +32,13 @@ import java.io.IOException;
*/
public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequest, GetWatchResponse> {
private final WatchService watchService;
private final WatcherService watcherService;
@Inject
public TransportGetWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatchService watchService, LicenseService licenseService) {
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, GetWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.watchService = watchService;
this.watcherService = watcherService;
}
@Override
@ -59,7 +59,7 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Override
protected void masterOperation(GetWatchRequest request, ClusterState state, ActionListener<GetWatchResponse> listener) throws ElasticsearchException {
try {
Watch watch = watchService.getWatch(request.getId());
Watch watch = watcherService.getWatch(request.getId());
if (watch == null) {
listener.onResponse(new GetWatchResponse(request.getId(), -1, false, null));
return;

View File

@ -19,20 +19,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
/**
*/
public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequest, PutWatchResponse> {
private final WatchService watchService;
private final WatcherService watcherService;
@Inject
public TransportPutWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatchService watchService, LicenseService licenseService) {
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, PutWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.watchService = watchService;
this.watcherService = watcherService;
}
@Override
@ -53,7 +53,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
@Override
protected void masterOperation(PutWatchRequest request, ClusterState state, ActionListener<PutWatchResponse> listener) throws ElasticsearchException {
try {
IndexResponse indexResponse = watchService.putWatch(request.getId(), request.getSource());
IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource());
listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()));
} catch (Exception e) {
listener.onFailure(e);

View File

@ -21,22 +21,22 @@ import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
/**
* Performs the stats operation.
*/
public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherStatsRequest, WatcherStatsResponse> {
private final WatchService watchService;
private final WatcherService watcherService;
private final ExecutionService executionService;
@Inject
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatchService watchService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService,
ExecutionService executionService, LicenseService licenseService) {
super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService);
this.watchService = watchService;
this.watcherService = watcherService;
this.executionService = executionService;
}
@ -58,9 +58,9 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
@Override
protected void masterOperation(WatcherStatsRequest request, ClusterState state, ActionListener<WatcherStatsResponse> listener) throws ElasticsearchException {
WatcherStatsResponse statsResponse = new WatcherStatsResponse();
statsResponse.setWatchServiceState(watchService.state());
statsResponse.setWatchServiceState(watcherService.state());
statsResponse.setWatchExecutionQueueSize(executionService.queueSize());
statsResponse.setWatchesCount(watchService.watchesCount());
statsResponse.setWatchesCount(watcherService.watchesCount());
statsResponse.setWatchExecutionQueueMaxSize(executionService.largestQueueSize());
statsResponse.setVersion(WatcherVersion.CURRENT);
statsResponse.setBuild(WatcherBuild.CURRENT);

View File

@ -7,7 +7,7 @@ package org.elasticsearch.watcher.transport.actions.stats;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.watcher.WatcherBuild;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -19,7 +19,7 @@ public class WatcherStatsResponse extends ActionResponse {
private WatcherVersion version;
private WatcherBuild build;
private long watchesCount;
private WatchService.State watchServiceState;
private WatcherService.State watchServiceState;
private long watchExecutionQueueSize;
private long watchExecutionQueueMaxSize;
@ -62,11 +62,11 @@ public class WatcherStatsResponse extends ActionResponse {
/**
* @return The state of the watch service.
*/
public WatchService.State getWatchServiceState() {
public WatcherService.State getWatchServiceState() {
return watchServiceState;
}
void setWatchServiceState(WatchService.State watcherServiceState) {
void setWatchServiceState(WatcherService.State watcherServiceState) {
this.watchServiceState = watcherServiceState;
}
@ -98,7 +98,7 @@ public class WatcherStatsResponse extends ActionResponse {
watchesCount = in.readLong();
watchExecutionQueueSize = in.readLong();
watchExecutionQueueMaxSize = in.readLong();
watchServiceState = WatchService.State.fromId(in.readByte());
watchServiceState = WatcherService.State.fromId(in.readByte());
version = WatcherVersion.readVersion(in);
build = WatcherBuild.readBuild(in);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.watch;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.watcher.WatcherService;
/**
*
@ -16,7 +17,7 @@ public class WatchModule extends AbstractModule {
protected void configure() {
bind(Watch.Parser.class).asEagerSingleton();
bind(WatchLockService.class).asEagerSingleton();
bind(WatchService.class).asEagerSingleton();
bind(WatcherService.class).asEagerSingleton();
bind(WatchStore.class).asEagerSingleton();
}
}

View File

@ -17,7 +17,6 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.watch.WatchService;
import org.junit.Before;
import org.junit.Test;
@ -29,7 +28,7 @@ import static org.mockito.Mockito.*;
public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
private ThreadPool threadPool;
private WatchService watchService;
private WatcherService watcherService;
private ClusterService clusterService;
private IndicesService indicesService;
private WatcherLifeCycleService lifeCycleService;
@ -38,10 +37,10 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
public void prepareServices() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watchService = mock(WatchService.class);
watcherService = mock(WatcherService.class);
clusterService = mock(ClusterService.class);
indicesService = mock(IndicesService.class);
lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, watchService);
lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, watcherService);
}
@Test
@ -50,25 +49,25 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watchService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherService.State.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(1)).start(clusterState);
verify(watchService, never()).stop();
verify(watcherService, times(1)).start(clusterState);
verify(watcherService, never()).stop();
// Trying to start a second time, but that should have no affect.
when(watchService.state()).thenReturn(WatchService.State.STARTED);
when(watcherService.state()).thenReturn(WatcherService.State.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(1)).start(clusterState);
verify(watchService, never()).stop();
verify(watcherService, times(1)).start(clusterState);
verify(watcherService, never()).stop();
// Stopping because local node is no longer master node
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
ClusterState noMasterClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", noMasterClusterState, noMasterClusterState));
verify(watchService, times(1)).stop();
verify(watchService, times(1)).start(clusterState);
verify(watcherService, times(1)).stop();
verify(watcherService, times(1)).start(clusterState);
}
@Test
@ -77,53 +76,53 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.nodes(nodes).build();
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watcherService.state()).thenReturn(WatcherService.State.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, never()).start(any(ClusterState.class));
verify(watcherService, never()).start(any(ClusterState.class));
}
@Test
public void testManualStartStop() {
lifeCycleService.start();
verify(watchService, times(1)).start(any(ClusterState.class));
verify(watchService, never()).stop();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, never()).stop();
lifeCycleService.stop();
verify(watchService, times(1)).start(any(ClusterState.class));
verify(watchService, times(1)).stop();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop();
// Starting via cluster state update, we shouldn't start because we have been stopped manually.
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watcherService.state()).thenReturn(WatcherService.State.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(1)).start(any(ClusterState.class));
verify(watchService, times(1)).stop();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop();
// we can only start, if we start manually
lifeCycleService.start();
verify(watchService, times(2)).start(any(ClusterState.class));
verify(watchService, times(1)).stop();
verify(watcherService, times(2)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop();
// stop watcher via cluster state update
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watcherService.state()).thenReturn(WatcherService.State.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(2)).start(any(ClusterState.class));
verify(watchService, times(2)).stop();
verify(watcherService, times(2)).start(any(ClusterState.class));
verify(watcherService, times(2)).stop();
// starting watcher via cluster state update, which should work, because we manually started before
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.validate(clusterState)).thenReturn(true);
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherService.State.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(3)).start(any(ClusterState.class));
verify(watchService, times(2)).stop();
verify(watcherService, times(3)).start(any(ClusterState.class));
verify(watcherService, times(2)).stop();
}
}

View File

@ -58,7 +58,7 @@ import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.Schedules;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.junit.After;
import org.junit.Before;
@ -178,10 +178,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
private void startWatcherIfNodesExist() throws Exception {
if (internalTestCluster().size() > 0) {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
if (response.getWatchServiceState() == WatchService.State.STOPPED) {
if (response.getWatchServiceState() == WatcherService.State.STOPPED) {
logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
startWatcher();
} else if (response.getWatchServiceState() == WatchService.State.STARTING) {
} else if (response.getWatchServiceState() == WatcherService.State.STARTING) {
logger.info("[{}#{}]: watcher is starting, waiting for it to get in a started state", getTestClass().getSimpleName(), getTestName());
ensureWatcherStarted(false);
} else {
@ -296,8 +296,8 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return getInstanceFromMaster(ExecutionService.class);
}
protected WatchService watchService() {
return getInstanceFromMaster(WatchService.class);
protected WatcherService watchService() {
return getInstanceFromMaster(WatcherService.class);
}
protected TriggerService triggerService() {
@ -434,9 +434,9 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
public void run() {
if (useClient) {
assertThat(watcherClient().prepareWatcherStats().get().getWatchServiceState(), is(WatchService.State.STARTED));
assertThat(watcherClient().prepareWatcherStats().get().getWatchServiceState(), is(WatcherService.State.STARTED));
} else {
assertThat(getInstanceFromMaster(WatchService.class).state(), is(WatchService.State.STARTED));
assertThat(getInstanceFromMaster(WatcherService.class).state(), is(WatcherService.State.STARTED));
}
}
});
@ -462,9 +462,9 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
public void run() {
if (useClient) {
assertThat(watcherClient().prepareWatcherStats().get().getWatchServiceState(), is(WatchService.State.STOPPED));
assertThat(watcherClient().prepareWatcherStats().get().getWatchServiceState(), is(WatcherService.State.STOPPED));
} else {
assertThat(getInstanceFromMaster(WatchService.class).state(), is(WatchService.State.STOPPED));
assertThat(getInstanceFromMaster(WatcherService.class).state(), is(WatcherService.State.STOPPED));
}
}
});

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -35,7 +34,7 @@ import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.IOException;
@ -144,7 +143,7 @@ public class WatcherScheduleEngineBenchmark {
Clock clock = node.injector().getInstance(Clock.class);
WatcherClient watcherClient = node.injector().getInstance(WatcherClient.class);
while (watcherClient.prepareWatcherStats().get().getWatchServiceState() != WatchService.State.STARTED) {
while (watcherClient.prepareWatcherStats().get().getWatchServiceState() != WatcherService.State.STARTED) {
Thread.sleep(100);
}
long actualLoadedWatches = watcherClient.prepareWatcherStats().get().getWatchesCount();

View File

@ -36,7 +36,7 @@ import org.elasticsearch.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
import org.junit.Test;
@ -71,7 +71,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
startWatcher();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
}
@ -81,7 +81,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
ensureWatcherStarted();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(0L));
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
@ -125,7 +125,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
startWatcher();
response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThanOrEqualTo(1l));
}
@ -185,7 +185,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
startWatcher();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatchServiceState(), equalTo(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
final long totalHistoryEntries = numberOfWatchRecordsPerIndex * numberOfWatchHistoryIndices;
assertBusy(new Runnable() {

View File

@ -8,7 +8,7 @@ package org.elasticsearch.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
@ -168,8 +168,8 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
}
}, 30, TimeUnit.SECONDS), equalTo(true));
// Ensure that the watch manager doesn't run elsewhere
for (WatchService watchService : internalTestCluster().getInstances(WatchService.class)) {
assertThat(watchService.state(), is(WatchService.State.STOPPED));
for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) {
assertThat(watcherService.state(), is(WatcherService.State.STOPPED));
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.watcher.WatcherBuild;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
@ -39,7 +39,7 @@ public class WatchStatsTests extends AbstractWatcherIntegrationTests {
WatcherStatsRequest watcherStatsRequest = watcherClient().prepareWatcherStats().request();
WatcherStatsResponse response = watcherClient().watcherStats(watcherStatsRequest).actionGet();
assertThat(response.getWatchServiceState(), is(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), is(WatcherService.State.STARTED));
assertThat(response.getExecutionQueueSize(), is(0L));
assertThat(response.getWatchesCount(), is(0L));
assertThat(response.getWatchExecutionQueueMaxSize(), is(timeWarped() ? 1L : 0L));
@ -54,7 +54,7 @@ public class WatchStatsTests extends AbstractWatcherIntegrationTests {
WatcherStatsRequest watcherStatsRequest = watcherClient.prepareWatcherStats().request();
WatcherStatsResponse response = watcherClient.watcherStats(watcherStatsRequest).actionGet();
assertThat(response.getWatchServiceState(), equalTo(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), equalTo(WatcherService.State.STARTED));
SearchRequest searchRequest = WatcherTestUtils.newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
BytesReference watchSource = createWatchSource("* * * * * ? *", searchRequest, "ctx.payload.hits.total == 1");
@ -71,7 +71,7 @@ public class WatchStatsTests extends AbstractWatcherIntegrationTests {
response = watcherClient().watcherStats(watcherStatsRequest).actionGet();
assertThat(response.getWatchServiceState(), is(WatchService.State.STARTED));
assertThat(response.getWatchServiceState(), is(WatcherService.State.STARTED));
assertThat(response.getWatchesCount(), is(1L));
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThan(0L));
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
@ -33,7 +34,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
private TriggerService triggerService;
private WatchStore watchStore;
private WatchService watchService;
private WatcherService watcherService;
private ExecutionService executionService;
private WatchLockService watchLockService;
@ -43,11 +44,11 @@ public class WatchServiceTests extends ElasticsearchTestCase {
watchStore = mock(WatchStore.class);
executionService = mock(ExecutionService.class);
watchLockService = mock(WatchLockService.class);
watchService = new WatchService(ImmutableSettings.EMPTY, triggerService, watchStore, executionService, watchLockService);
Field field = WatchService.class.getDeclaredField("state");
watcherService = new WatcherService(ImmutableSettings.EMPTY, triggerService, watchStore, executionService, watchLockService);
Field field = WatcherService.class.getDeclaredField("state");
field.setAccessible(true);
AtomicReference<WatchService.State> state = (AtomicReference<WatchService.State>) field.get(watchService);
state.set(WatchService.State.STARTED);
AtomicReference<WatcherService.State> state = (AtomicReference<WatcherService.State>) field.get(watcherService);
state.set(WatcherService.State.STARTED);
}
@Test
@ -61,7 +62,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire(any(String.class))).thenReturn(lock);
when(watchStore.put(any(String.class), any(BytesReference.class))).thenReturn(watchPut);
IndexResponse response = watchService.putWatch("_name", new BytesArray("{}"));
IndexResponse response = watcherService.putWatch("_name", new BytesArray("{}"));
assertThat(response, sameInstance(indexResponse));
verify(triggerService, times(1)).add(any(TriggerEngine.Job.class));
@ -84,7 +85,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire(any(String.class))).thenReturn(lock);
when(watchStore.put(any(String.class), any(BytesReference.class))).thenReturn(watchPut);
IndexResponse response = watchService.putWatch("_name", new BytesArray("{}"));
IndexResponse response = watcherService.putWatch("_name", new BytesArray("{}"));
assertThat(response, sameInstance(indexResponse));
verifyZeroInteractions(triggerService);
@ -100,7 +101,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
when(deleteResponse.isFound()).thenReturn(true);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_name")).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watchService.deleteWatch("_name");
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_name");
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verify(triggerService, times(1)).remove("_name");
@ -116,7 +117,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
when(deleteResponse.isFound()).thenReturn(false);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_name")).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watchService.deleteWatch("_name");
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_name");
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verifyZeroInteractions(triggerService);
@ -132,7 +133,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
when(watch.status()).thenReturn(status);
when(watchStore.get("_name")).thenReturn(watch);
Watch.Status result = watchService.ackWatch("_name");
Watch.Status result = watcherService.ackWatch("_name");
assertThat(result, not(sameInstance(status)));
verify(watchStore, times(1)).updateStatus(watch);
@ -148,7 +149,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
when(watch.status()).thenReturn(status);
when(watchStore.get("_name")).thenReturn(watch);
Watch.Status result = watchService.ackWatch("_name");
Watch.Status result = watcherService.ackWatch("_name");
assertThat(result, not(sameInstance(status)));
verify(watchStore, never()).updateStatus(watch);
@ -161,7 +162,7 @@ public class WatchServiceTests extends ElasticsearchTestCase {
when(watchStore.get("_name")).thenReturn(null);
try {
watchService.ackWatch("_name");
watcherService.ackWatch("_name");
fail();
} catch (WatcherException e) {
// expected