Watcher: Remove async code in sync code block (elastic/elasticsearch#4506)

Watcher: Remove async code in sync code block

When removing the watch store this code snippet sneaked in.
A call to get a watch in order to find out if it exists was async, but
the code checking the result was called immediately afterwards without
waiting for the result, thus always using the default value.

This also removes some unused code in TriggeredWatchStore.

Original commit: elastic/x-pack-elasticsearch@c47e70bf8a
This commit is contained in:
Alexander Reelsen 2017-01-05 10:24:17 +01:00 committed by GitHub
parent c64ad22579
commit cb7f916485
3 changed files with 51 additions and 42 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.watcher.Watcher;
@ -262,10 +263,13 @@ public final class ExecutionService extends AbstractComponent {
logger.trace("not executing watch [{}] because it is already queued", ctx.watch().id());
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
} else {
final AtomicBoolean watchExists = new AtomicBoolean(true);
client.getWatch(ctx.watch().id(), ActionListener.wrap((r) -> watchExists.set(r.isExists()), (e) -> watchExists.set(false)));
boolean watchExists = false;
try {
GetResponse response = client.getWatch(ctx.watch().id());
watchExists = response.isExists();
} catch (IndexNotFoundException e) {}
if (ctx.knownWatch() && watchExists.get() == false) {
if (ctx.knownWatch() && watchExists == false) {
// fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);

View File

@ -92,50 +92,12 @@ public class TriggeredWatchStore extends AbstractComponent {
}
}
public void put(TriggeredWatch triggeredWatch) throws Exception {
ensureStarted();
accessLock.lock();
try {
IndexRequest request = new IndexRequest(INDEX_NAME, DOC_TYPE, triggeredWatch.id().value())
.source(XContentFactory.jsonBuilder().value(triggeredWatch))
.opType(IndexRequest.OpType.CREATE);
client.index(request, (TimeValue) null);
} catch (IOException e) {
throw ioException("failed to persist triggered watch [{}]", e, triggeredWatch);
} finally {
accessLock.unlock();
}
}
public void put(final TriggeredWatch triggeredWatch, final ActionListener<Boolean> listener) {
ensureStarted();
try {
IndexRequest request = new IndexRequest(INDEX_NAME, DOC_TYPE, triggeredWatch.id().value())
.source(XContentFactory.jsonBuilder().value(triggeredWatch))
.opType(IndexRequest.OpType.CREATE);
client.index(request, ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
} catch (IOException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("could not index triggered watch [{}], ignoring it...",
triggeredWatch.id()), e);
}
}
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BitSet> listener) {
if (triggeredWatches.isEmpty()) {
listener.onResponse(new BitSet(0));
return;
}
if (triggeredWatches.size() == 1) {
put(triggeredWatches.get(0), ActionListener.wrap(success -> {
BitSet bitSet = new BitSet(1);
bitSet.set(0);
listener.onResponse(bitSet);
}, listener::onFailure));
return;
}
ensureStarted();
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
@ -163,6 +125,10 @@ public class TriggeredWatchStore extends AbstractComponent {
}, listener::onFailure));
}
public void put(TriggeredWatch triggeredWatch) throws Exception {
putAll(Collections.singletonList(triggeredWatch));
}
public BitSet putAll(final List<TriggeredWatch> triggeredWatches) throws Exception {
ensureStarted();
try {
@ -268,5 +234,4 @@ public class TriggeredWatchStore extends AbstractComponent {
throw illegalState("unable to persist triggered watches, the store is not ready");
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.support.clock.ClockMock;
@ -798,6 +799,45 @@ public class ExecutionServiceTests extends ESTestCase {
verify(ctx).abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED), eq("Watch is already queued in thread pool"));
}
public void testExecuteWatchNotFound() throws Exception {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(ctx.knownWatch()).thenReturn(true);
when(ctx.watch()).thenReturn(watch);
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(false);
boolean exceptionThrown = false;
if (randomBoolean()) {
when(client.getWatch("_id")).thenReturn(getResponse);
} else {
// this emulates any failure while getting the watch, while index not found is an accepted issue
if (randomBoolean()) {
exceptionThrown = true;
ElasticsearchException e = new ElasticsearchException("something went wrong, i.e. index not found");
when(client.getWatch("_id")).thenThrow(e);
WatchExecutionResult result = new WatchExecutionResult(ctx, randomInt(10));
WatchRecord wr = new WatchRecord.ExceptionWatchRecord(ctx, result, e);
when(ctx.abortFailedExecution(eq(e))).thenReturn(wr);
} else {
when(client.getWatch("_id")).thenThrow(new IndexNotFoundException(".watch"));
}
}
WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class);
when(record.state()).thenReturn(ExecutionState.NOT_EXECUTED_WATCH_MISSING);
when(ctx.abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_WATCH_MISSING), any())).thenReturn(record);
when(ctx.executionPhase()).thenReturn(ExecutionPhase.AWAITS_EXECUTION);
WatchRecord watchRecord = executionService.execute(ctx);
if (exceptionThrown) {
assertThat(watchRecord.state(), is(ExecutionState.FAILED));
} else {
assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING));
}
}
private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true);