cleanup indexing operation listener

This commit is contained in:
Areek Zillur 2016-10-25 09:33:04 -04:00
parent 168946ad5a
commit bb785483ae
7 changed files with 37 additions and 68 deletions

View File

@ -536,16 +536,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
} }
public Engine.IndexResult index(Engine.Index index) { public Engine.IndexResult index(final Engine.Index index) {
ensureWriteAllowed(index); ensureWriteAllowed(index);
Engine engine = getEngine(); Engine engine = getEngine();
return index(engine, index); return index(engine, index);
} }
private Engine.IndexResult index(Engine engine, Engine.Index index) { private Engine.IndexResult index(final Engine engine, final Engine.Index index) {
active.set(true); active.set(true);
final Engine.IndexResult result; final Engine.IndexResult result;
index = indexingOperationListeners.preIndex(index); indexingOperationListeners.preIndex(index);
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
@ -585,16 +585,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime); return new Engine.Delete(type, id, uid, version, versionType, origin, startTime);
} }
public Engine.DeleteResult delete(Engine.Delete delete) { public Engine.DeleteResult delete(final Engine.Delete delete) {
ensureWriteAllowed(delete); ensureWriteAllowed(delete);
Engine engine = getEngine(); Engine engine = getEngine();
return delete(engine, delete); return delete(engine, delete);
} }
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { private Engine.DeleteResult delete(final Engine engine, final Engine.Delete delete) {
active.set(true); active.set(true);
final Engine.DeleteResult result; final Engine.DeleteResult result;
delete = indexingOperationListeners.preDelete(delete); indexingOperationListeners.preDelete(delete);
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text()); logger.trace("delete [{}]", delete.uid().text());

View File

@ -30,44 +30,25 @@ import java.util.List;
*/ */
public interface IndexingOperationListener { public interface IndexingOperationListener {
/** /** Called before the indexing occurs */
* Called before the indexing occurs. default void preIndex(Engine.Index operation) {}
*/
default Engine.Index preIndex(Engine.Index operation) {
return operation;
}
/** /** Called after the indexing operation occurred */
* Called after the indexing operation occurred.
*/
default void postIndex(Engine.Index index, Engine.IndexResult result) {} default void postIndex(Engine.Index index, Engine.IndexResult result) {}
/** /** Called after the indexing operation occurred with exception */
* Called after the indexing operation occurred with exception.
*/
default void postIndex(Engine.Index index, Exception ex) {} default void postIndex(Engine.Index index, Exception ex) {}
/** /** Called before the delete occurs */
* Called before the delete occurs. default void preDelete(Engine.Delete delete) {}
*/
default Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
/** Called after the delete operation occurred */
/**
* Called after the delete operation occurred.
*/
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {} default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
/** /** Called after the delete operation occurred with exception */
* Called after the delete operation occurred with exception.
*/
default void postDelete(Engine.Delete delete, Exception ex) {} default void postDelete(Engine.Delete delete, Exception ex) {}
/** /** A Composite listener that multiplexes calls to each of the listeners methods */
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeListener implements IndexingOperationListener{ final class CompositeListener implements IndexingOperationListener{
private final List<IndexingOperationListener> listeners; private final List<IndexingOperationListener> listeners;
private final Logger logger; private final Logger logger;
@ -78,7 +59,7 @@ public interface IndexingOperationListener {
} }
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
assert operation != null; assert operation != null;
for (IndexingOperationListener listener : listeners) { for (IndexingOperationListener listener : listeners) {
try { try {
@ -87,12 +68,11 @@ public interface IndexingOperationListener {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e); logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
} }
} }
return operation;
} }
@Override @Override
public void postIndex(Engine.Index index, Engine.IndexResult result) { public void postIndex(Engine.Index index, Engine.IndexResult result) {
assert index != null; assert index != null && result != null;
for (IndexingOperationListener listener : listeners) { for (IndexingOperationListener listener : listeners) {
try { try {
listener.postIndex(index, result); listener.postIndex(index, result);
@ -116,7 +96,7 @@ public interface IndexingOperationListener {
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
assert delete != null; assert delete != null;
for (IndexingOperationListener listener : listeners) { for (IndexingOperationListener listener : listeners) {
try { try {
@ -125,12 +105,11 @@ public interface IndexingOperationListener {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e); logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
} }
} }
return delete;
} }
@Override @Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
assert delete != null; assert delete != null && result != null;
for (IndexingOperationListener listener : listeners) { for (IndexingOperationListener listener : listeners) {
try { try {
listener.postDelete(delete, result); listener.postDelete(delete, result);

View File

@ -65,12 +65,11 @@ final class InternalIndexingStats implements IndexingOperationListener {
} }
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
if (!operation.origin().isRecovery()) { if (!operation.origin().isRecovery()) {
totalStats.indexCurrent.inc(); totalStats.indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc(); typeStats(operation.type()).indexCurrent.inc();
} }
return operation;
} }
@Override @Override
@ -96,13 +95,11 @@ final class InternalIndexingStats implements IndexingOperationListener {
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
if (!delete.origin().isRecovery()) { if (!delete.origin().isRecovery()) {
totalStats.deleteCurrent.inc(); totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc();
} }
return delete;
} }
@Override @Override

View File

@ -233,9 +233,8 @@ public class IndexModuleTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean(false); AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() { IndexingOperationListener listener = new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
executed.set(true); executed.set(true);
return operation;
} }
}; };
module.addIndexOperationListener(listener); module.addIndexOperationListener(listener);

View File

@ -558,9 +558,8 @@ public class IndexShardTests extends IndexShardTestCase {
shard.close("simon says", true); shard.close("simon says", true);
shard = reinitShard(shard, new IndexingOperationListener() { shard = reinitShard(shard, new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
preIndex.incrementAndGet(); preIndex.incrementAndGet();
return operation;
} }
@Override @Override
@ -578,9 +577,8 @@ public class IndexShardTests extends IndexShardTestCase {
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
preDelete.incrementAndGet(); preDelete.incrementAndGet();
return delete;
} }
@Override @Override
@ -1121,9 +1119,8 @@ public class IndexShardTests extends IndexShardTestCase {
final AtomicInteger postDelete = new AtomicInteger(); final AtomicInteger postDelete = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() { IndexingOperationListener listener = new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
preIndex.incrementAndGet(); preIndex.incrementAndGet();
return operation;
} }
@Override @Override
@ -1132,9 +1129,8 @@ public class IndexShardTests extends IndexShardTestCase {
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
preDelete.incrementAndGet(); preDelete.incrementAndGet();
return delete;
} }
@Override @Override

View File

@ -40,9 +40,8 @@ public class IndexingOperationListenerTests extends ESTestCase{
AtomicInteger postDeleteException = new AtomicInteger(); AtomicInteger postDeleteException = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() { IndexingOperationListener listener = new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
preIndex.incrementAndGet(); preIndex.incrementAndGet();
return operation;
} }
@Override @Override
@ -56,9 +55,8 @@ public class IndexingOperationListenerTests extends ESTestCase{
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
preDelete.incrementAndGet(); preDelete.incrementAndGet();
return delete;
} }
@Override @Override
@ -74,7 +72,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
IndexingOperationListener throwingListener = new IndexingOperationListener() { IndexingOperationListener throwingListener = new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public void preIndex(Engine.Index operation) {
throw new RuntimeException(); throw new RuntimeException();
} }
@ -89,7 +87,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
throw new RuntimeException(); throw new RuntimeException();
} }

View File

@ -199,23 +199,23 @@ public class CancelTests extends ReindexTestCase {
public static class BlockingOperationListener implements IndexingOperationListener { public static class BlockingOperationListener implements IndexingOperationListener {
@Override @Override
public Engine.Index preIndex(Engine.Index index) { public void preIndex(Engine.Index index) {
return preCheck(index, index.type()); preCheck(index, index.type());
} }
@Override @Override
public Engine.Delete preDelete(Engine.Delete delete) { public void preDelete(Engine.Delete delete) {
return preCheck(delete, delete.type()); preCheck(delete, delete.type());
} }
private <T extends Engine.Operation> T preCheck(T operation, String type) { private void preCheck(Engine.Operation operation, String type) {
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
return operation; return;
} }
try { try {
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
return operation; return;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);