Revert "cleanup indexing operation listener"
This reverts commit bb785483ae
.
This commit is contained in:
parent
c237263ad1
commit
65832b987f
|
@ -536,16 +536,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
|
return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Engine.IndexResult index(final Engine.Index index) {
|
public Engine.IndexResult index(Engine.Index index) {
|
||||||
ensureWriteAllowed(index);
|
ensureWriteAllowed(index);
|
||||||
Engine engine = getEngine();
|
Engine engine = getEngine();
|
||||||
return index(engine, index);
|
return index(engine, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Engine.IndexResult index(final Engine engine, final Engine.Index index) {
|
private Engine.IndexResult index(Engine engine, Engine.Index index) {
|
||||||
active.set(true);
|
active.set(true);
|
||||||
final Engine.IndexResult result;
|
final Engine.IndexResult result;
|
||||||
indexingOperationListeners.preIndex(index);
|
index = indexingOperationListeners.preIndex(index);
|
||||||
try {
|
try {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
|
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
|
||||||
|
@ -585,16 +585,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime);
|
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Engine.DeleteResult delete(final Engine.Delete delete) {
|
public Engine.DeleteResult delete(Engine.Delete delete) {
|
||||||
ensureWriteAllowed(delete);
|
ensureWriteAllowed(delete);
|
||||||
Engine engine = getEngine();
|
Engine engine = getEngine();
|
||||||
return delete(engine, delete);
|
return delete(engine, delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Engine.DeleteResult delete(final Engine engine, final Engine.Delete delete) {
|
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) {
|
||||||
active.set(true);
|
active.set(true);
|
||||||
final Engine.DeleteResult result;
|
final Engine.DeleteResult result;
|
||||||
indexingOperationListeners.preDelete(delete);
|
delete = indexingOperationListeners.preDelete(delete);
|
||||||
try {
|
try {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("delete [{}]", delete.uid().text());
|
logger.trace("delete [{}]", delete.uid().text());
|
||||||
|
|
|
@ -30,25 +30,44 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public interface IndexingOperationListener {
|
public interface IndexingOperationListener {
|
||||||
|
|
||||||
/** Called before the indexing occurs */
|
/**
|
||||||
default void preIndex(Engine.Index operation) {}
|
* Called before the indexing occurs.
|
||||||
|
*/
|
||||||
|
default Engine.Index preIndex(Engine.Index operation) {
|
||||||
|
return operation;
|
||||||
|
}
|
||||||
|
|
||||||
/** Called after the indexing operation occurred */
|
/**
|
||||||
|
* Called after the indexing operation occurred.
|
||||||
|
*/
|
||||||
default void postIndex(Engine.Index index, Engine.IndexResult result) {}
|
default void postIndex(Engine.Index index, Engine.IndexResult result) {}
|
||||||
|
|
||||||
/** Called after the indexing operation occurred with exception */
|
/**
|
||||||
|
* Called after the indexing operation occurred with exception.
|
||||||
|
*/
|
||||||
default void postIndex(Engine.Index index, Exception ex) {}
|
default void postIndex(Engine.Index index, Exception ex) {}
|
||||||
|
|
||||||
/** Called before the delete occurs */
|
/**
|
||||||
default void preDelete(Engine.Delete delete) {}
|
* Called before the delete occurs.
|
||||||
|
*/
|
||||||
|
default Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
|
return delete;
|
||||||
|
}
|
||||||
|
|
||||||
/** Called after the delete operation occurred */
|
|
||||||
|
/**
|
||||||
|
* Called after the delete operation occurred.
|
||||||
|
*/
|
||||||
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
|
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
|
||||||
|
|
||||||
/** Called after the delete operation occurred with exception */
|
/**
|
||||||
|
* Called after the delete operation occurred with exception.
|
||||||
|
*/
|
||||||
default void postDelete(Engine.Delete delete, Exception ex) {}
|
default void postDelete(Engine.Delete delete, Exception ex) {}
|
||||||
|
|
||||||
/** A Composite listener that multiplexes calls to each of the listeners methods */
|
/**
|
||||||
|
* A Composite listener that multiplexes calls to each of the listeners methods.
|
||||||
|
*/
|
||||||
final class CompositeListener implements IndexingOperationListener{
|
final class CompositeListener implements IndexingOperationListener{
|
||||||
private final List<IndexingOperationListener> listeners;
|
private final List<IndexingOperationListener> listeners;
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
|
@ -59,7 +78,7 @@ public interface IndexingOperationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
assert operation != null;
|
assert operation != null;
|
||||||
for (IndexingOperationListener listener : listeners) {
|
for (IndexingOperationListener listener : listeners) {
|
||||||
try {
|
try {
|
||||||
|
@ -68,11 +87,12 @@ public interface IndexingOperationListener {
|
||||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
|
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postIndex(Engine.Index index, Engine.IndexResult result) {
|
public void postIndex(Engine.Index index, Engine.IndexResult result) {
|
||||||
assert index != null && result != null;
|
assert index != null;
|
||||||
for (IndexingOperationListener listener : listeners) {
|
for (IndexingOperationListener listener : listeners) {
|
||||||
try {
|
try {
|
||||||
listener.postIndex(index, result);
|
listener.postIndex(index, result);
|
||||||
|
@ -96,7 +116,7 @@ public interface IndexingOperationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
assert delete != null;
|
assert delete != null;
|
||||||
for (IndexingOperationListener listener : listeners) {
|
for (IndexingOperationListener listener : listeners) {
|
||||||
try {
|
try {
|
||||||
|
@ -105,11 +125,12 @@ public interface IndexingOperationListener {
|
||||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
|
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return delete;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
|
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
|
||||||
assert delete != null && result != null;
|
assert delete != null;
|
||||||
for (IndexingOperationListener listener : listeners) {
|
for (IndexingOperationListener listener : listeners) {
|
||||||
try {
|
try {
|
||||||
listener.postDelete(delete, result);
|
listener.postDelete(delete, result);
|
||||||
|
|
|
@ -65,11 +65,12 @@ final class InternalIndexingStats implements IndexingOperationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
if (!operation.origin().isRecovery()) {
|
if (!operation.origin().isRecovery()) {
|
||||||
totalStats.indexCurrent.inc();
|
totalStats.indexCurrent.inc();
|
||||||
typeStats(operation.type()).indexCurrent.inc();
|
typeStats(operation.type()).indexCurrent.inc();
|
||||||
}
|
}
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,11 +96,13 @@ final class InternalIndexingStats implements IndexingOperationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
if (!delete.origin().isRecovery()) {
|
if (!delete.origin().isRecovery()) {
|
||||||
totalStats.deleteCurrent.inc();
|
totalStats.deleteCurrent.inc();
|
||||||
typeStats(delete.type()).deleteCurrent.inc();
|
typeStats(delete.type()).deleteCurrent.inc();
|
||||||
}
|
}
|
||||||
|
return delete;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -233,8 +233,9 @@ public class IndexModuleTests extends ESTestCase {
|
||||||
AtomicBoolean executed = new AtomicBoolean(false);
|
AtomicBoolean executed = new AtomicBoolean(false);
|
||||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
executed.set(true);
|
executed.set(true);
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
module.addIndexOperationListener(listener);
|
module.addIndexOperationListener(listener);
|
||||||
|
|
|
@ -558,8 +558,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
shard.close("simon says", true);
|
shard.close("simon says", true);
|
||||||
shard = reinitShard(shard, new IndexingOperationListener() {
|
shard = reinitShard(shard, new IndexingOperationListener() {
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
preIndex.incrementAndGet();
|
preIndex.incrementAndGet();
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -577,8 +578,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
preDelete.incrementAndGet();
|
preDelete.incrementAndGet();
|
||||||
|
return delete;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1119,8 +1121,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
final AtomicInteger postDelete = new AtomicInteger();
|
final AtomicInteger postDelete = new AtomicInteger();
|
||||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
preIndex.incrementAndGet();
|
preIndex.incrementAndGet();
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1129,8 +1132,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
preDelete.incrementAndGet();
|
preDelete.incrementAndGet();
|
||||||
|
return delete;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,8 +40,9 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
||||||
AtomicInteger postDeleteException = new AtomicInteger();
|
AtomicInteger postDeleteException = new AtomicInteger();
|
||||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
preIndex.incrementAndGet();
|
preIndex.incrementAndGet();
|
||||||
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -55,8 +56,9 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
preDelete.incrementAndGet();
|
preDelete.incrementAndGet();
|
||||||
|
return delete;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -72,7 +74,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
||||||
|
|
||||||
IndexingOperationListener throwingListener = new IndexingOperationListener() {
|
IndexingOperationListener throwingListener = new IndexingOperationListener() {
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index operation) {
|
public Engine.Index preIndex(Engine.Index operation) {
|
||||||
throw new RuntimeException();
|
throw new RuntimeException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +89,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
throw new RuntimeException();
|
throw new RuntimeException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -199,23 +199,23 @@ public class CancelTests extends ReindexTestCase {
|
||||||
public static class BlockingOperationListener implements IndexingOperationListener {
|
public static class BlockingOperationListener implements IndexingOperationListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preIndex(Engine.Index index) {
|
public Engine.Index preIndex(Engine.Index index) {
|
||||||
preCheck(index, index.type());
|
return preCheck(index, index.type());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preDelete(Engine.Delete delete) {
|
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||||
preCheck(delete, delete.type());
|
return preCheck(delete, delete.type());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void preCheck(Engine.Operation operation, String type) {
|
private <T extends Engine.Operation> T preCheck(T operation, String type) {
|
||||||
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
|
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
|
||||||
return;
|
return operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
||||||
return;
|
return operation;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|
Loading…
Reference in New Issue