From 2fe2c7fef87aff15e14943a3b07235bd6d6da851 Mon Sep 17 00:00:00 2001 From: Ivannikov Kirill Date: Tue, 1 Sep 2015 12:45:40 +0500 Subject: [PATCH] Add listeners to postCreate etc --- .../indexing/IndexingOperationListener.java | 16 ++++++- .../index/indexing/ShardIndexingService.java | 45 +++++++++++++++---- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java index ba968172c98..bb4c109e6af 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java @@ -43,12 +43,19 @@ public abstract class IndexingOperationListener { } /** - * Called after the indexing operation occurred. + * Called after create index operation occurred. */ public void postCreate(Engine.Create create) { } + /** + * Called after create index operation occurred with exception. + */ + public void postCreate(Engine.Create create, Throwable ex) { + + } + /** * Called before the indexing occurs. */ @@ -103,4 +110,11 @@ public abstract class IndexingOperationListener { public void postDelete(Engine.Delete delete) { } + + /** + * Called after the delete operation occurred with exception. + */ + public void postDelete(Engine.Delete delete, Throwable ex) { + + } } diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java index 303c338184e..dd2e43fcc83 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java @@ -89,7 +89,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { totalStats.indexCurrent.inc(); typeStats(create.type()).indexCurrent.inc(); for (IndexingOperationListener listener : listeners) { - create = listener.preCreate(create); + try { + create = listener.preCreate(create); + } catch (Exception e) { + logger.warn("preCreate listener [{}] failed", e, listener); + } } return create; } @@ -124,19 +128,31 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postCreate(create); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postCreate listener [{}] failed", e, listener); } } } public void postCreate(Engine.Create create, Throwable ex) { + for (IndexingOperationListener listener : listeners) { + try { + listener.postCreate(create, ex); + } catch (Exception e) { + logger.warn("postCreate listener [{}] failed", e, listener); + } + } } public Engine.Index preIndex(Engine.Index index) { totalStats.indexCurrent.inc(); typeStats(index.type()).indexCurrent.inc(); for (IndexingOperationListener listener : listeners) { - index = listener.preIndex(index); + try { + listener.preIndex(index); + } catch (Exception e) { + logger.warn("preIndex listener [{}] failed", e, listener); + } + } return index; } @@ -146,7 +162,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postIndexUnderLock(index); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postIndexUnderLock listener [{}] failed", e, listener); } } } @@ -163,7 +179,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postIndex(index); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postIndex listener [{}] failed", e, listener); } } } @@ -175,7 +191,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postIndex(index, ex); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postIndex listener [{}] failed", e, listener); } } } @@ -184,7 +200,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { totalStats.deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc(); for (IndexingOperationListener listener : listeners) { - delete = listener.preDelete(delete); + try { + delete = listener.preDelete(delete); + } catch (Exception e) { + logger.warn("preDelete listener [{}] failed", e, listener); + } } return delete; } @@ -194,7 +214,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postDeleteUnderLock(delete); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postDeleteUnderLock listener [{}] failed", e, listener); } } } @@ -210,7 +230,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { try { listener.postDelete(delete); } catch (Exception e) { - logger.warn("post listener [{}] failed", e, listener); + logger.warn("postDelete listener [{}] failed", e, listener); } } } @@ -218,6 +238,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public void postDelete(Engine.Delete delete, Throwable ex) { totalStats.deleteCurrent.dec(); typeStats(delete.type()).deleteCurrent.dec(); + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete, ex); + } catch (Exception e) { + logger.warn("postDelete listener [{}] failed", e, listener); + } + } } public void noopUpdate(String type) {