Merge branch 'master' into fix/session-timeout-warning-system-api

Original commit: elastic/x-pack-elasticsearch@13680956c4
This commit is contained in:
Lukas Olson 2016-09-07 11:22:36 -07:00
commit a1d8967d62
11 changed files with 291 additions and 155 deletions

View File

@ -30,6 +30,7 @@ import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
@ -166,7 +167,7 @@ public class HttpRequest implements ToXContent {
HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout); HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout);
} }
if (proxy != null) { if (proxy != null) {
builder.field(Field.PROXY.getPreferredName(), proxy); proxy.toXContent(builder, params);
} }
return builder.endObject(); return builder.endObject();
} }
@ -195,19 +196,7 @@ public class HttpRequest implements ToXContent {
@Override @Override
public int hashCode() { public int hashCode() {
int result = host.hashCode(); return Objects.hash(host, port, scheme, method, path, params, headers, auth, connectionTimeout, readTimeout, body, proxy);
result = 31 * result + port;
result = 31 * result + scheme.hashCode();
result = 31 * result + method.hashCode();
result = 31 * result + (path != null ? path.hashCode() : 0);
result = 31 * result + params.hashCode();
result = 31 * result + headers.hashCode();
result = 31 * result + (auth != null ? auth.hashCode() : 0);
result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0);
result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0);
result = 31 * result + (body != null ? body.hashCode() : 0);
result = 31 * result + (proxy != null ? proxy.hashCode() : 0);
return result;
} }
@Override @Override

View File

@ -5,7 +5,9 @@
*/ */
package org.elasticsearch.xpack.common.http; package org.elasticsearch.xpack.common.http;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -14,6 +16,10 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.common.http.HttpRequest; import org.elasticsearch.xpack.common.http.HttpRequest;
import org.elasticsearch.xpack.common.http.Scheme; import org.elasticsearch.xpack.common.http.Scheme;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry; import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -65,6 +71,54 @@ public class HttpRequestTests extends ESTestCase {
} }
} }
public void testXContentSerialization() throws Exception {
final HttpRequest.Builder builder;
if (randomBoolean()) {
builder = HttpRequest.builder();
builder.fromUrl("http://localhost:9200/generic/createevent");
} else {
builder = HttpRequest.builder("localhost", 9200);
if (randomBoolean()) {
builder.scheme(randomFrom(Scheme.values()));
if (usually()) {
builder.path(randomAsciiOfLength(50));
}
}
}
if (usually()) {
builder.method(randomFrom(HttpMethod.values()));
}
if (randomBoolean()) {
builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10));
if (randomBoolean()) {
builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10));
}
}
if (randomBoolean()) {
builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10));
if (randomBoolean()) {
builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10));
}
}
if (randomBoolean()) {
builder.auth(new BasicAuth(randomAsciiOfLength(10), randomAsciiOfLength(20).toCharArray()));
}
if (randomBoolean()) {
builder.body(randomAsciiOfLength(200));
}
if (randomBoolean()) {
builder.connectionTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting"));
}
if (randomBoolean()) {
builder.readTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting"));
}
if (randomBoolean()) {
builder.proxy(new HttpProxy(randomAsciiOfLength(10), randomIntBetween(1024, 65000)));
}
builder.build().toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
}
private void assertThatManualBuilderEqualsParsingFromUrl(String url, HttpRequest.Builder builder) throws Exception { private void assertThatManualBuilderEqualsParsingFromUrl(String url, HttpRequest.Builder builder) throws Exception {
XContentBuilder urlContentBuilder = jsonBuilder().startObject().field("url", url).endObject(); XContentBuilder urlContentBuilder = jsonBuilder().startObject().field("url", url).endObject();
XContentParser urlContentParser = JsonXContent.jsonXContent.createParser(urlContentBuilder.bytes()); XContentParser urlContentParser = JsonXContent.jsonXContent.createParser(urlContentBuilder.bytes());

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
if (!event.localNodeMaster()) { if (!event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED) { if (watcherService.state() == WatcherState.STARTED) {
// to avoid unnecessary forking of threads... // We're no longer the master so we need to stop the watcher.
return; // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false));
} }
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
stop(false);
}
});
} else { } else {
if (watcherService.state() != WatcherState.STOPPED) { if (watcherService.state() == WatcherState.STOPPED) {
// to avoid unnecessary forking of threads... final ClusterState state = event.state();
return; threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false));
} } else {
boolean isWatchIndexDeleted = event.indicesDeleted().stream()
.filter(index -> WatchStore.INDEX.equals(index.getName()))
.findAny()
.isPresent();
final ClusterState state = event.state(); boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) &&
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN;
@Override boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) &&
public void run() { event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE;
start(state, false); boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState;
if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed());
} }
}); }
} }
} }

View File

@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService; import org.elasticsearch.xpack.watcher.watch.WatchLockService;
@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent {
innerMap.putAll(watchStore.usageStats()); innerMap.putAll(watchStore.usageStats());
return innerMap; return innerMap;
} }
/**
* Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches
* as those watches cannot be updated anymore
*/
public void watchIndexDeletedOrClosed() {
watchStore.clearWatchesInMemory();
executionService.clearExecutions();
}
} }

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.stats.Counters; import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.Watcher;
@ -60,13 +61,14 @@ public class ExecutionService extends AbstractComponent {
private final Clock clock; private final Clock clock;
private final TimeValue defaultThrottlePeriod; private final TimeValue defaultThrottlePeriod;
private final TimeValue maxStopTimeout; private final TimeValue maxStopTimeout;
private final ThreadPool threadPool;
private volatile CurrentExecutions currentExecutions = null; private volatile CurrentExecutions currentExecutions = null;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
@Inject @Inject
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
WatchStore watchStore, WatchLockService watchLockService, Clock clock) { WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) {
super(settings); super(settings);
this.historyStore = historyStore; this.historyStore = historyStore;
this.triggeredWatchStore = triggeredWatchStore; this.triggeredWatchStore = triggeredWatchStore;
@ -76,6 +78,7 @@ public class ExecutionService extends AbstractComponent {
this.clock = clock; this.clock = clock;
this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings);
this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings);
this.threadPool = threadPool;
} }
public void start(ClusterState state) throws Exception { public void start(ClusterState state) throws Exception {
@ -141,12 +144,7 @@ public class ExecutionService extends AbstractComponent {
currentExecutions.add(watchExecution.createSnapshot()); currentExecutions.add(watchExecution.createSnapshot());
} }
// Lets show the longest running watch first: // Lets show the longest running watch first:
Collections.sort(currentExecutions, new Comparator<WatchExecutionSnapshot>() { Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime));
@Override
public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) {
return e1.executionTime().compareTo(e2.executionTime());
}
});
return currentExecutions; return currentExecutions;
} }
@ -163,12 +161,8 @@ public class ExecutionService extends AbstractComponent {
queuedWatches.add(new QueuedWatch(executionTask.ctx)); queuedWatches.add(new QueuedWatch(executionTask.ctx));
} }
// Lets show the execution that pending the longest first: // Lets show the execution that pending the longest first:
Collections.sort(queuedWatches, new Comparator<QueuedWatch>() {
@Override Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime));
public int compare(QueuedWatch e1, QueuedWatch e2) {
return e1.executionTime().compareTo(e2.executionTime());
}
});
return queuedWatches; return queuedWatches;
} }
@ -332,20 +326,36 @@ public class ExecutionService extends AbstractComponent {
thread pool that executes the watches is completely busy, we don't lose the fact that the watch was thread pool that executes the watches is completely busy, we don't lose the fact that the watch was
triggered (it'll have its history record) triggered (it'll have its history record)
*/ */
private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) {
private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception {
try { try {
executor.execute(new WatchExecutionTask(ctx)); executor.execute(new WatchExecutionTask(ctx));
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; // we are still in the transport thread here most likely, so we cannot run heavy operations
logger.debug("{}", message); // this means some offloading needs to be done for indexing into the history and delete the triggered watches entry
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); threadPool.generic().execute(() -> {
if (ctx.overrideRecordOnConflict()) { String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
historyStore.forcePut(record); logger.debug("{}", message);
} else { WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
historyStore.put(record); try {
} if (ctx.overrideRecordOnConflict()) {
triggeredWatchStore.delete(triggeredWatch.id()); historyStore.forcePut(record);
} else {
historyStore.put(record);
}
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection",
triggeredWatch.id()), exc);
}
try {
triggeredWatchStore.delete(triggeredWatch.id());
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " +
"rejection", triggeredWatch.id()), exc);
}
});
} }
} }
@ -438,6 +448,15 @@ public class ExecutionService extends AbstractComponent {
return counters.toMap(); return counters.toMap();
} }
/**
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*/
public void clearExecutions() {
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
currentExecutions = new CurrentExecutions();
}
private static final class StartupExecutionContext extends TriggeredExecutionContext { private static final class StartupExecutionContext extends TriggeredExecutionContext {
public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {

View File

@ -222,7 +222,7 @@ public class TriggeredWatchStore extends AbstractComponent {
} }
} }
public void delete(Wid wid) throws Exception { public void delete(Wid wid) {
ensureStarted(); ensureStarted();
accessLock.lock(); accessLock.lock();
try { try {

View File

@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent {
} }
} }
public void clearWatchesInMemory() {
watches.clear();
}
public class WatchPut { public class WatchPut {
private final Watch previous; private final Watch previous;

View File

@ -5,18 +5,22 @@
*/ */
package org.elasticsearch.xpack.watcher; package org.elasticsearch.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.junit.Before; import org.junit.Before;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, never()).start(any(ClusterState.class)); verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop(); verify(watcherService, never()).stop();
} }
public void testWatchIndexDeletion() throws Exception {
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build();
// old cluster state that contains watcher index
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
// new cluster state that does not contain watcher index
ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();
}
public void testWatchIndexClosing() throws Exception {
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build();
// old cluster state that contains watcher index
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
// new cluster state with a closed watcher index
ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();
}
} }

View File

@ -5,10 +5,13 @@
*/ */
package org.elasticsearch.xpack.watcher.execution; package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.Action;
@ -41,7 +44,9 @@ import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -51,7 +56,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTime.now;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -68,15 +75,16 @@ public class ExecutionServiceTests extends ESTestCase {
private Input.Result inputResult; private Input.Result inputResult;
private WatchStore watchStore; private WatchStore watchStore;
private TriggeredWatchStore triggeredWatchStore;
private WatchExecutor executor;
private HistoryStore historyStore; private HistoryStore historyStore;
private WatchLockService watchLockService; private WatchLockService watchLockService;
private ExecutionService executionService; private ExecutionService executionService;
private Clock clock; private Clock clock;
private ThreadPool threadPool;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
TriggeredWatchStore triggeredWatchStore;
payload = mock(Payload.class); payload = mock(Payload.class);
input = mock(ExecutableInput.class); input = mock(ExecutableInput.class);
inputResult = mock(Input.Result.class); inputResult = mock(Input.Result.class);
@ -88,13 +96,14 @@ public class ExecutionServiceTests extends ESTestCase {
triggeredWatchStore = mock(TriggeredWatchStore.class); triggeredWatchStore = mock(TriggeredWatchStore.class);
historyStore = mock(HistoryStore.class); historyStore = mock(HistoryStore.class);
WatchExecutor executor = mock(WatchExecutor.class); executor = mock(WatchExecutor.class);
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
watchLockService = mock(WatchLockService.class); watchLockService = mock(WatchLockService.class);
clock = new ClockMock(); clock = new ClockMock();
threadPool = mock(ThreadPool.class);
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore,
watchLockService, clock); watchLockService, clock, threadPool);
ClusterState clusterState = mock(ClusterState.class); ClusterState clusterState = mock(ClusterState.class);
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>());
@ -483,7 +492,7 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteInner() throws Exception { public void testExecuteInner() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -560,7 +569,7 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteInnerThrottled() throws Exception { public void testExecuteInnerThrottled() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -613,7 +622,7 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteInnerConditionNotMet() throws Exception { public void testExecuteInnerConditionNotMet() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -774,6 +783,28 @@ public class ExecutionServiceTests extends ESTestCase {
verify(action, never()).execute("_action", context, payload); verify(action, never()).execute("_action", context, payload);
} }
public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("foo");
when(watch.nonce()).thenReturn(1L);
when(watchStore.get(any())).thenReturn(watch);
// execute needs to fail as well as storing the history
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any());
Wid wid = new Wid(watch.id(), watch.nonce(), now());
Executor currentThreadExecutor = command -> command.run();
when(threadPool.generic()).thenReturn(currentThreadExecutor);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now()));
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
verify(triggeredWatchStore, times(1)).delete(wid);
verify(historyStore, times(1)).forcePut(any(WatchRecord.class));
}
private Tuple<ExecutableCondition, Condition.Result> whenCondition(final WatchExecutionContext context) { private Tuple<ExecutableCondition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class); Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true); when(conditionResult.met()).thenReturn(true);
@ -791,5 +822,4 @@ public class ExecutionServiceTests extends ESTestCase {
return new Tuple<>(transform, transformResult); return new Tuple<>(transform, transformResult);
} }
} }

View File

@ -108,7 +108,7 @@ public class WatcherUtilsTests extends ESTestCase {
} }
String text = randomAsciiOfLengthBetween(1, 5); String text = randomAsciiOfLengthBetween(1, 5);
ScriptService.ScriptType scriptType = randomFrom(ScriptService.ScriptType.values()); ScriptService.ScriptType scriptType = randomFrom(ScriptService.ScriptType.values());
expectedTemplate = new Script(text, scriptType, randomBoolean() ? null : "mustache", params); expectedTemplate = new Script(text, scriptType, "mustache", params);
request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType, request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType,
expectedIndicesOptions, expectedTemplate); expectedIndicesOptions, expectedTemplate);
} else { } else {

View File

@ -54,6 +54,7 @@ import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartRefreshFailed() { public void testStartRefreshFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); createWatchIndexMetaData(csBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 0); RefreshResponse refreshResponse = mockRefreshResponse(1, 0);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartSearchFailed() { public void testStartSearchFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); createWatchIndexMetaData(csBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 1); RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartNoWatchStored() throws Exception { public void testStartNoWatchStored() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); createWatchIndexMetaData(csBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 1); RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartWatchStored() throws Exception { public void testStartWatchStored() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); createWatchIndexMetaData(csBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 1); RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase {
public void testUsageStats() throws Exception { public void testUsageStats() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); createWatchIndexMetaData(csBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 1); RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase {
assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0))); assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0)));
} }
public void testThatCleaningWatchesWorks() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
BytesReference source = new BytesArray("{}");
InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap());
hit.sourceRef(source);
SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse);
Watch watch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
when(watch.status()).thenReturn(status);
when(parser.parse("_id1", true, source)).thenReturn(watch);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches(), hasSize(1));
watchStore.clearWatchesInMemory();
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches(), hasSize(0));
assertThat(watchStore.activeWatches(), hasSize(0));
}
/*
* Creates the standard cluster state metadata for the watches index
* with shards/replicas being marked as started
*/
private void createWatchIndexMetaData(ClusterState.Builder builder) {
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
builder.metaData(metaDateBuilder);
builder.routingTable(routingTableBuilder.build());
}
private RefreshResponse mockRefreshResponse(int total, int successful) { private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class); RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total); when(refreshResponse.getTotalShards()).thenReturn(total);
@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase {
when(searchResponse.getTotalShards()).thenReturn(total); when(searchResponse.getTotalShards()).thenReturn(total);
when(searchResponse.getSuccessfulShards()).thenReturn(successful); when(searchResponse.getSuccessfulShards()).thenReturn(successful);
when(searchResponse.getHits()).thenReturn(internalSearchHits); when(searchResponse.getHits()).thenReturn(internalSearchHits);
when(searchResponse.getHits()).thenReturn(internalSearchHits);
return searchResponse; return searchResponse;
} }