mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Watcher: Ensure state is cleaned properly in watcher life cycle service (elastic/x-pack-elasticsearch#3770)
The WatcherLifeCycleService is responsible for deciding if watches need to be reloaded while running. In order to do this, the service stores the currently local shard allocation ids in a List. This data structure however was not properly updated all the time, when it should have been - for example when a master node is not available. This lead to unintended reloads, even though there was no change in the allocated shards. This in turn lead to unwanted executions and unwanted loading of triggered watches. This commit should also fix one of the more nasty ongoing test failures, where the test returns with an exception that only parts of watcher have been started. The AbstractWatcherIntegrationTestCase now properly waits until watcher is started before starting the test case itself. Original commit: elastic/x-pack-elasticsearch@097f12a900
This commit is contained in:
parent
da7560a079
commit
c65528b9f6
@ -21,11 +21,11 @@ import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
|
||||
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
|
||||
import org.elasticsearch.xpack.core.watcher.WatcherState;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
||||
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
|
||||
|
||||
@ -43,7 +43,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
private final WatcherService watcherService;
|
||||
private final ExecutorService executor;
|
||||
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
|
||||
private volatile WatcherMetaData watcherMetaData;
|
||||
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
|
||||
|
||||
|
||||
@ -61,7 +60,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
shutDown();
|
||||
}
|
||||
});
|
||||
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true));
|
||||
}
|
||||
|
||||
public synchronized void stop(String reason) {
|
||||
@ -73,7 +71,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
stop("shutdown initiated");
|
||||
}
|
||||
|
||||
private synchronized void start(ClusterState state, boolean manual) {
|
||||
private synchronized void start(ClusterState state) {
|
||||
if (shutDown) {
|
||||
return;
|
||||
}
|
||||
@ -86,7 +84,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
|
||||
// If we start from a cluster state update we need to check if previously we stopped manually
|
||||
// otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
|
||||
if (!manual && watcherMetaData != null && watcherMetaData.manuallyStopped()) {
|
||||
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
|
||||
if (watcherMetaData != null && watcherMetaData.manuallyStopped()) {
|
||||
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started");
|
||||
return;
|
||||
}
|
||||
@ -99,8 +98,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
}
|
||||
|
||||
if (watcherService.validate(state)) {
|
||||
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
|
||||
logger.trace("starting... (based on cluster state version [{}])", state.getVersion());
|
||||
try {
|
||||
// we need to populate the allocation ids before the next cluster state listener comes in
|
||||
checkAndSetAllocationIds(state, false);
|
||||
watcherService.start(state);
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
|
||||
@ -120,68 +121,30 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || shutDown) {
|
||||
clearAllocationIds();
|
||||
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
|
||||
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
|
||||
return;
|
||||
}
|
||||
|
||||
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
|
||||
clearAllocationIds();
|
||||
executor.execute(() -> this.stop("no master node"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) {
|
||||
clearAllocationIds();
|
||||
executor.execute(() -> this.stop("write level cluster block"));
|
||||
return;
|
||||
}
|
||||
|
||||
// find out if watcher was stopped or started manually due to this cluster state change
|
||||
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
|
||||
|
||||
if (watcherMetaData != null) {
|
||||
this.watcherMetaData = watcherMetaData;
|
||||
}
|
||||
|
||||
boolean currentWatcherStopped = watcherMetaData != null && watcherMetaData.manuallyStopped() == true;
|
||||
if (currentWatcherStopped) {
|
||||
if (isWatcherStoppedManually(event.state())) {
|
||||
clearAllocationIds();
|
||||
executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update"));
|
||||
} else {
|
||||
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
|
||||
DiscoveryNode localNode = event.state().nodes().getLocalNode();
|
||||
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
|
||||
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
|
||||
|
||||
// no watcher index, time to pause, as there are for sure no shards on this node
|
||||
if (watcherIndexMetaData == null) {
|
||||
if (previousAllocationIds.get().isEmpty() == false) {
|
||||
previousAllocationIds.set(Collections.emptyList());
|
||||
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
String watchIndex = watcherIndexMetaData.getIndex().getName();
|
||||
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
|
||||
|
||||
// no local shards, empty out watcher and not waste resources!
|
||||
if (localShards.isEmpty()) {
|
||||
if (previousAllocationIds.get().isEmpty() == false) {
|
||||
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
|
||||
previousAllocationIds.set(Collections.emptyList());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> currentAllocationIds = localShards.stream()
|
||||
.map(ShardRouting::allocationId)
|
||||
.map(AllocationId::getId)
|
||||
.collect(Collectors.toList());
|
||||
Collections.sort(currentAllocationIds);
|
||||
|
||||
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
|
||||
previousAllocationIds.set(currentAllocationIds);
|
||||
executor.execute(() -> watcherService.reload(event.state(), "different shards allocated on this node"));
|
||||
}
|
||||
checkAndSetAllocationIds(event.state(), true);
|
||||
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
|
||||
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
|
||||
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME,
|
||||
@ -191,7 +154,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
|
||||
UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
|
||||
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
|
||||
executor.execute(() -> start(event.state(), false));
|
||||
checkAndSetAllocationIds(event.state(), false);
|
||||
executor.execute(() -> start(event.state()));
|
||||
} else {
|
||||
logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]",
|
||||
isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex);
|
||||
@ -200,7 +164,75 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||
}
|
||||
}
|
||||
|
||||
public WatcherMetaData watcherMetaData() {
|
||||
return watcherMetaData;
|
||||
/**
|
||||
* check if watcher has been stopped manually via the stop API
|
||||
*/
|
||||
private boolean isWatcherStoppedManually(ClusterState state) {
|
||||
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
|
||||
return watcherMetaData != null && watcherMetaData.manuallyStopped();
|
||||
}
|
||||
|
||||
/**
|
||||
* check and optionally set the current allocation ids
|
||||
*
|
||||
* @param state the current cluster state
|
||||
* @param callWatcherService should the watcher service be called for starting/stopping/reloading or should this be treated as a
|
||||
* dryrun so that the caller is responsible for this
|
||||
*/
|
||||
private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherService) {
|
||||
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
|
||||
if (watcherIndexMetaData == null) {
|
||||
if (clearAllocationIds() && callWatcherService) {
|
||||
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
DiscoveryNode localNode = state.nodes().getLocalNode();
|
||||
RoutingNode routingNode = state.getRoutingNodes().node(localNode.getId());
|
||||
// this can happen if the node does not hold any data
|
||||
if (routingNode == null) {
|
||||
if (clearAllocationIds() && callWatcherService) {
|
||||
executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?"));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
String watchIndex = watcherIndexMetaData.getIndex().getName();
|
||||
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
|
||||
// no local shards, empty out watcher and dont waste resources!
|
||||
if (localShards.isEmpty()) {
|
||||
if (clearAllocationIds() && callWatcherService) {
|
||||
executor.execute(() -> watcherService.pauseExecution("no local watcher shards found"));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> currentAllocationIds = localShards.stream()
|
||||
.map(ShardRouting::allocationId)
|
||||
.map(AllocationId::getId)
|
||||
.collect(Collectors.toList());
|
||||
Collections.sort(currentAllocationIds);
|
||||
|
||||
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
|
||||
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
|
||||
if (callWatcherService) {
|
||||
executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* clear out current allocation ids if not already happened
|
||||
* @return true, if existing allocation ids were cleaned out, false otherwise
|
||||
*/
|
||||
private boolean clearAllocationIds() {
|
||||
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList());
|
||||
return previousIds.equals(Collections.emptyList()) == false;
|
||||
}
|
||||
|
||||
// for testing purposes only
|
||||
List<String> allocationIds() {
|
||||
return previousAllocationIds.get();
|
||||
}
|
||||
}
|
||||
|
@ -14,10 +14,10 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
|
||||
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
|
||||
import org.elasticsearch.xpack.watcher.WatcherService;
|
||||
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
|
||||
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
|
||||
@ -33,27 +33,24 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
|
||||
private final WatcherService watcherService;
|
||||
private final ExecutionService executionService;
|
||||
private final TriggerService triggerService;
|
||||
private final WatcherLifeCycleService lifeCycleService;
|
||||
|
||||
@Inject
|
||||
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
|
||||
ExecutionService executionService, TriggerService triggerService,
|
||||
WatcherLifeCycleService lifeCycleService) {
|
||||
ExecutionService executionService, TriggerService triggerService) {
|
||||
super(settings, WatcherStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
|
||||
WatcherStatsRequest::new, WatcherStatsRequest.Node::new, ThreadPool.Names.MANAGEMENT,
|
||||
WatcherStatsResponse.Node.class);
|
||||
this.watcherService = watcherService;
|
||||
this.executionService = executionService;
|
||||
this.triggerService = triggerService;
|
||||
this.lifeCycleService = lifeCycleService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WatcherStatsResponse newResponse(WatcherStatsRequest request, List<WatcherStatsResponse.Node> nodes,
|
||||
List<FailedNodeException> failures) {
|
||||
return new WatcherStatsResponse(clusterService.getClusterName(), lifeCycleService.watcherMetaData(), nodes, failures);
|
||||
return new WatcherStatsResponse(clusterService.getClusterName(), getWatcherMetaData(), nodes, failures);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -83,4 +80,11 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
|
||||
return statsResponse;
|
||||
}
|
||||
|
||||
private WatcherMetaData getWatcherMetaData() {
|
||||
WatcherMetaData watcherMetaData = clusterService.state().getMetaData().custom(WatcherMetaData.TYPE);
|
||||
if (watcherMetaData == null) {
|
||||
watcherMetaData = new WatcherMetaData(false);
|
||||
}
|
||||
return watcherMetaData;
|
||||
}
|
||||
}
|
@ -20,6 +20,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -50,6 +52,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME;
|
||||
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME;
|
||||
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
@ -278,15 +281,15 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||
|
||||
// set current allocation ids
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards));
|
||||
verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards"));
|
||||
verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards found"));
|
||||
|
||||
// no more local hards, lets pause execution
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithLocalShards));
|
||||
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards"));
|
||||
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found"));
|
||||
|
||||
// no further invocations should happen if the cluster state does not change in regard to local shards
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithoutLocalShards));
|
||||
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards"));
|
||||
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found"));
|
||||
}
|
||||
|
||||
public void testReplicaWasAddedOrRemoved() throws Exception {
|
||||
@ -539,6 +542,41 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||
verify(watcherService, times(1)).stop(eq("write level cluster block"));
|
||||
}
|
||||
|
||||
public void testStateIsSetImmediately() throws Exception {
|
||||
Index index = new Index(Watch.INDEX, "foo");
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
|
||||
indexRoutingTableBuilder.addShard(
|
||||
TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED));
|
||||
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT)
|
||||
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
|
||||
.numberOfShards(1).numberOfReplicas(0);
|
||||
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1")
|
||||
.add(newNode("node_1")))
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build())
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
|
||||
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
|
||||
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
|
||||
.put(indexMetaDataBuilder)
|
||||
.build())
|
||||
.build();
|
||||
when(watcherService.validate(state)).thenReturn(true);
|
||||
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
|
||||
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state));
|
||||
verify(watcherService, times(1)).start(eq(state));
|
||||
assertThat(lifeCycleService.allocationIds(), hasSize(1));
|
||||
|
||||
// now do any cluster state upgrade, see that reload gets triggers, but should not
|
||||
when(watcherService.state()).thenReturn(WatcherState.STARTED);
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state));
|
||||
verify(watcherService, never()).pauseExecution(anyString());
|
||||
|
||||
verify(watcherService, never()).reload(eq(state), anyString());
|
||||
assertThat(lifeCycleService.allocationIds(), hasSize(1));
|
||||
}
|
||||
|
||||
private List<String> randomIndexPatterns() {
|
||||
return IntStream.range(0, between(1, 10))
|
||||
.mapToObj(n -> randomAlphaOfLengthBetween(1, 100))
|
||||
|
@ -185,20 +185,10 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
timeWarp = new TimeWarp(internalCluster().getInstances(ScheduleTriggerEngineMock.class),
|
||||
(ClockMock)getInstanceFromMaster(Clock.class));
|
||||
}
|
||||
startWatcherIfNodesExist();
|
||||
createWatcherIndicesOrAliases();
|
||||
}
|
||||
|
||||
@After
|
||||
public void _cleanup() throws Exception {
|
||||
// Clear all internal watcher state for the next test method:
|
||||
logger.info("[#{}]: clearing watcher state", getTestName());
|
||||
stopWatcher();
|
||||
}
|
||||
|
||||
private void startWatcherIfNodesExist() throws Exception {
|
||||
if (internalCluster().size() > 0) {
|
||||
ensureLicenseEnabled();
|
||||
|
||||
if (timeWarped()) {
|
||||
// now that the license is enabled and valid we can freeze all nodes clocks
|
||||
logger.info("[{}#{}]: freezing time on nodes", getTestClass().getSimpleName(), getTestName());
|
||||
@ -206,10 +196,19 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
internalCluster().setDisruptionScheme(ice);
|
||||
ice.startDisrupting();
|
||||
}
|
||||
assertAcked(watcherClient().prepareWatchService().start().get());
|
||||
|
||||
createWatcherIndicesOrAliases();
|
||||
startWatcher();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void _cleanup() throws Exception {
|
||||
// Clear all internal watcher state for the next test method:
|
||||
logger.info("[#{}]: clearing watcher state", getTestName());
|
||||
stopWatcher();
|
||||
}
|
||||
|
||||
/**
|
||||
* In order to test, that .watches and .triggered-watches indices can also point to an alias, we will rarely create those
|
||||
* after starting watcher
|
||||
@ -237,6 +236,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
assertAcked(response);
|
||||
ensureGreen(newIndex);
|
||||
}
|
||||
logger.info("set alias for .watches index to [{}]", newIndex);
|
||||
} else {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
if (randomBoolean()) {
|
||||
@ -270,6 +270,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
assertAcked(response);
|
||||
ensureGreen(newIndex);
|
||||
}
|
||||
logger.info("set alias for .triggered-watches index to [{}]", newIndex);
|
||||
} else {
|
||||
assertAcked(client().admin().indices().prepareCreate(TriggeredWatchStoreField.INDEX_NAME));
|
||||
ensureGreen(TriggeredWatchStoreField.INDEX_NAME);
|
||||
@ -277,9 +278,8 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(DateTime.now(DateTimeZone.UTC));
|
||||
assertAcked(client().admin().indices().prepareCreate(historyIndex));
|
||||
logger.info("creating watch history index [{}]", historyIndex);
|
||||
ensureGreen(historyIndex);
|
||||
|
||||
ensureWatcherStarted();
|
||||
}
|
||||
}
|
||||
|
||||
@ -364,7 +364,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
ExecutionState.EXECUTED.id())))
|
||||
.get();
|
||||
lastResponse.set(searchResponse);
|
||||
assertThat("could not find executed watch record", searchResponse.getHits().getTotalHits(),
|
||||
assertThat("could not find executed watch record for watch " + watchName, searchResponse.getHits().getTotalHits(),
|
||||
greaterThanOrEqualTo(minimumExpectedWatchActionsWithActionPerformed));
|
||||
if (assertConditionMet) {
|
||||
assertThat((Integer) XContentMapValues.extractValue("result.input.payload.hits.total",
|
||||
@ -508,12 +508,12 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||
}
|
||||
|
||||
protected void startWatcher() throws Exception {
|
||||
watcherClient().prepareWatchService().start().get();
|
||||
assertAcked(watcherClient().prepareWatchService().start().get());
|
||||
ensureWatcherStarted();
|
||||
}
|
||||
|
||||
protected void stopWatcher() throws Exception {
|
||||
watcherClient().prepareWatchService().stop().get();
|
||||
assertAcked(watcherClient().prepareWatchService().stop().get());
|
||||
ensureWatcherStopped();
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
|
||||
@ -56,8 +55,6 @@ import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
|
||||
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
|
||||
"org.elasticsearch.xpack.watcher.execution:TRACE")
|
||||
public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
@ -192,6 +189,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
|
||||
|
||||
ensureGreen("output", "my-index");
|
||||
int numWatches = 8;
|
||||
for (int i = 0; i < numWatches; i++) {
|
||||
String watchId = "_id" + i;
|
||||
|
@ -14,7 +14,6 @@ import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
|
||||
@ -51,7 +50,6 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG,org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
|
||||
public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
@ -247,5 +245,4 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
|
||||
assertThat(response.getHits().getAt(0).getSourceAsMap().size(), equalTo(1));
|
||||
assertThat(response.getHits().getAt(0).getSourceAsMap().get("key4").toString(), equalTo("30"));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user