Simplify watcher indexing listener.(#53046)

Backport: #52627

Add watcher to trigger server after index operation has succeeded,
instead of adding a watch to trigger service before
the actual index operation has performed on the shard level.

This logic is simpler to reason about in the case that a failure
does occur during the execution of an index operation on
the shard level.

Relates to #52453, but I think doesn't fix it, but makes it easier
to debug.
This commit is contained in:
Martijn van Groningen 2020-03-03 11:01:57 +01:00 committed by GitHub
parent 844f350774
commit 510db25dd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 49 deletions

View File

@ -97,11 +97,16 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
*
* @param shardId The shard id object of the document being processed
* @param operation The index operation
* @return The index operation
* @param result The result of the operation
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResult result) {
if (isWatchDocument(shardId.getIndexName())) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
postIndex(shardId, operation, result.getFailure());
return;
}
ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
@ -109,8 +114,8 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
watch.id(), shardId, configuration.localShards.keySet());
return operation;
watch.id(), shardId, configuration.localShards.keySet());
return;
}
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
@ -128,32 +133,12 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
}
}
return operation;
}
/**
* In case of a document related failure (for example version conflict), then clean up resources for a watch
* in the trigger service.
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
assert result.getFailure() != null;
postIndex(shardId, index, result.getFailure());
}
}
/**
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
*
* TODO: If the configuration changes between preindex and postindex methods and we add a
* watch, that could not be indexed
* TODO: this watch might not be deleted from the triggerservice. Are we willing to accept this?
* TODO: This could be circumvented by using a threadlocal in preIndex(), that contains the
* watch and is cleared afterwards
* In case of an engine related error, we just log that we failed the add the watch to the trigger service.
* No need to interact with the trigger service.
*
* @param shardId The shard id object of the document being processed
* @param index The index operation
@ -162,8 +147,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
logger.debug(() -> new ParameterizedMessage("failed to add watch [{}] to trigger service", index.id()), ex);
}
}

View File

@ -114,18 +114,20 @@ public class WatcherIndexingListenerTests extends ESTestCase {
verifyZeroInteractions(parser);
}
public void testPreIndex() throws Exception {
public void testPostIndex() throws Exception {
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(result.getResultType()).thenReturn(randomFrom(types));
boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
Engine.Index returnedOperation = listener.preIndex(shardId, operation);
assertThat(returnedOperation, is(operation));
listener.postIndex(shardId, operation, result);
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
@ -140,12 +142,13 @@ public class WatcherIndexingListenerTests extends ESTestCase {
// this test emulates an index with 10 shards, and ensures that triggering only happens on a
// single shard
public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
String id = randomAlphaOfLength(10);
int totalShardCount = randomIntBetween(1, 10);
boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch(id, watchActive, isNewWatch);
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
@ -155,7 +158,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
localShards.put(shardId, new ShardAllocationConfiguration(idx, totalShardCount, Collections.emptyList()));
Configuration configuration = new Configuration(Watch.INDEX, localShards);
listener.setConfiguration(configuration);
listener.preIndex(shardId, operation);
listener.postIndex(shardId, operation, result);
}
// no matter how many shards we had, this should have been only called once
@ -187,16 +190,17 @@ public class WatcherIndexingListenerTests extends ESTestCase {
return watch;
}
public void testPreIndexCheckParsingException() throws Exception {
public void testPostIndexCheckParsingException() throws Exception {
String id = randomAlphaOfLength(10);
when(operation.id()).thenReturn(id);
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong()))
.thenThrow(new IOException("self thrown"));
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,
() -> listener.preIndex(shardId, operation));
() -> listener.postIndex(shardId, operation, result));
assertThat(exc.getMessage(), containsString("Could not parse watch"));
assertThat(exc.getMessage(), containsString(id));
}
@ -207,19 +211,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
listener.postIndex(shardId, operation, result);
verify(triggerService).remove(eq("_id"));
}
public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(randomFrom(types));
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
listener.postIndex(shardId, operation, result);
verifyZeroInteractions(triggerService);
}
@ -239,7 +230,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
verify(triggerService).remove(eq("_id"));
verifyZeroInteractions(triggerService);
}
public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {