Merge pull request #13203 from kiryam/master

Add listeners for postIndex, postCreate, and postDelete
This commit is contained in:
Simon Willnauer 2015-09-02 22:52:29 +02:00
commit d10f80aea0
3 changed files with 151 additions and 7 deletions

View File

@ -43,12 +43,19 @@ public abstract class IndexingOperationListener {
}
/**
* Called after the indexing operation occurred.
* Called after create index operation occurred.
*/
public void postCreate(Engine.Create create) {
}
/**
* Called after create index operation occurred with exception.
*/
public void postCreate(Engine.Create create, Throwable ex) {
}
/**
* Called before the indexing occurs.
*/
@ -73,6 +80,13 @@ public abstract class IndexingOperationListener {
}
/**
* Called after the indexing operation occurred with exception.
*/
public void postIndex(Engine.Index index, Throwable ex) {
}
/**
* Called before the delete occurs.
*/
@ -96,4 +110,11 @@ public abstract class IndexingOperationListener {
public void postDelete(Engine.Delete delete) {
}
/**
* Called after the delete operation occurred with exception.
*/
public void postDelete(Engine.Delete delete, Throwable ex) {
}
}

View File

@ -99,7 +99,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postCreateUnderLock(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postCreateUnderLock listener [{}] failed", e, listener);
}
}
}
@ -124,12 +124,19 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postCreate(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postCreate listener [{}] failed", e, listener);
}
}
}
public void postCreate(Engine.Create create, Throwable ex) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreate(create, ex);
} catch (Throwable t) {
logger.warn("postCreate listener [{}] failed", t, listener);
}
}
}
public Engine.Index preIndex(Engine.Index index) {
@ -146,7 +153,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postIndexUnderLock(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postIndexUnderLock listener [{}] failed", e, listener);
}
}
}
@ -163,7 +170,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postIndex(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postIndex listener [{}] failed", e, listener);
}
}
}
@ -171,6 +178,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public void postIndex(Engine.Index index, Throwable ex) {
totalStats.indexCurrent.dec();
typeStats(index.type()).indexCurrent.dec();
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, ex);
} catch (Throwable t) {
logger.warn("postIndex listener [{}] failed", t, listener);
}
}
}
public Engine.Delete preDelete(Engine.Delete delete) {
@ -187,7 +201,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postDeleteUnderLock(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postDeleteUnderLock listener [{}] failed", e, listener);
}
}
}
@ -203,7 +217,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
try {
listener.postDelete(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
logger.warn("postDelete listener [{}] failed", e, listener);
}
}
}
@ -211,6 +225,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public void postDelete(Engine.Delete delete, Throwable ex) {
totalStats.deleteCurrent.dec();
typeStats(delete.type()).deleteCurrent.dec();
for (IndexingOperationListener listener : listeners) {
try {
listener. postDelete(delete, ex);
} catch (Throwable t) {
logger.warn("postDelete listener [{}] failed", t, listener);
}
}
}
public void noopUpdate(String type) {

View File

@ -18,7 +18,10 @@
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
@ -38,6 +41,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
@ -49,6 +53,12 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.query.QueryParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
@ -64,9 +74,12 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.EMPTY_PARAMS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -585,4 +598,93 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertTrue(xContent.contains(expectedSubSequence));
}
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
document.add(uidField);
document.add(versionField);
return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate);
}
public void testPreIndex() throws IOException {
createIndex("testpreindex");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testpreindex");
IndexShard shard = test.shard(0);
ShardIndexingService shardIndexingService = shard.indexingService();
final AtomicBoolean preIndexCalled = new AtomicBoolean(false);
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index index) {
preIndexCalled.set(true);
return super.preIndex(index);
}
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index);
assertTrue(preIndexCalled.get());
}
public void testPostIndex() throws IOException {
createIndex("testpostindex");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testpostindex");
IndexShard shard = test.shard(0);
ShardIndexingService shardIndexingService = shard.indexingService();
final AtomicBoolean postIndexCalled = new AtomicBoolean(false);
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index) {
postIndexCalled.set(true);
super.postIndex(index);
}
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index);
assertTrue(postIndexCalled.get());
}
public void testPostIndexWithException() throws IOException {
createIndex("testpostindexwithexception");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testpostindexwithexception");
IndexShard shard = test.shard(0);
ShardIndexingService shardIndexingService = shard.indexingService();
shard.close("Unexpected close", true);
shard.state = IndexShardState.STARTED; // It will generate exception
final AtomicBoolean postIndexWithExceptionCalled = new AtomicBoolean(false);
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, Throwable ex) {
assertNotNull(ex);
postIndexWithExceptionCalled.set(true);
super.postIndex(index, ex);
}
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
try {
shard.index(index);
fail();
}catch (IllegalIndexShardStateException e){
}
assertTrue(postIndexWithExceptionCalled.get());
}
}