diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 0b979ffcac8..3cfe29103b8 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.bloom.BloomCache; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; +import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.merge.policy.EnableMergePolicy; import org.elasticsearch.index.merge.policy.MergePolicyProvider; @@ -95,6 +96,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final ThreadPool threadPool; + private final ShardIndexingService indexingService; + private final IndexSettingsService indexSettingsService; @Nullable @@ -158,7 +161,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, - IndexSettingsService indexSettingsService, @Nullable IndicesWarmer warmer, + IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService, @@ -176,6 +179,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { this.threadPool = threadPool; this.indexSettingsService = indexSettingsService; + this.indexingService = indexingService; this.warmer = (InternalIndicesWarmer) warmer; this.store = store; this.deletionPolicy = deletionPolicy; @@ -471,6 +475,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { Translog.Location translogLocation = translog.add(new Translog.Create(create)); versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); + + indexingService.postCreateUnderLock(create); } } @@ -583,6 +589,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { Translog.Location translogLocation = translog.add(new Translog.Index(index)); versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); + + indexingService.postIndexUnderLock(index); } } @@ -685,6 +693,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); } + + indexingService.postDeleteUnderLock(delete); } } diff --git a/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java index 790c79b717e..9c0eb5305d5 100644 --- a/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java +++ b/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java @@ -22,30 +22,78 @@ package org.elasticsearch.index.indexing; import org.elasticsearch.index.engine.Engine; /** - * + * An indexing listener for indexing, delete, events. */ public abstract class IndexingOperationListener { + /** + * Called before the indexing occurs. + */ public Engine.Create preCreate(Engine.Create create) { return create; } + /** + * Called after the indexing occurs, under a locking scheme to maintain + * concurrent updates to the same doc. + *
+ * Note, long operations should not occur under this callback. + */ + public void postCreateUnderLock(Engine.Create create) { + + } + + /** + * Called after the indexing operation occurred. + */ public void postCreate(Engine.Create create) { } + /** + * Called before the indexing occurs. + */ public Engine.Index preIndex(Engine.Index index) { return index; } + /** + * Called after the indexing occurs, under a locking scheme to maintain + * concurrent updates to the same doc. + * + * Note, long operations should not occur under this callback. + */ + public void postIndexUnderLock(Engine.Index index) { + + } + + /** + * Called after the indexing operation occurred. + */ public void postIndex(Engine.Index index) { } + /** + * Called before the delete occurs. + */ public Engine.Delete preDelete(Engine.Delete delete) { return delete; } + /** + * Called after the delete occurs, under a locking scheme to maintain + * concurrent updates to the same doc. + * + * Note, long operations should not occur under this callback. + */ + public void postDeleteUnderLock(Engine.Delete delete) { + + } + + /** + * Called after the delete operation occurred. + */ public void postDelete(Engine.Delete delete) { } diff --git a/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java index 12060c40c0c..79020d43ad7 100644 --- a/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java @@ -103,13 +103,29 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return create; } + public void postCreateUnderLock(Engine.Create create) { + if (listeners != null) { + for (IndexingOperationListener listener : listeners) { + try { + listener.postCreateUnderLock(create); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } + } + } + } + public void postCreate(Engine.Create create) { long took = create.endTime() - create.startTime(); totalStats.indexMetric.inc(took); typeStats(create.type()).indexMetric.inc(took); if (listeners != null) { for (IndexingOperationListener listener : listeners) { - listener.postCreate(create); + try { + listener.postCreate(create); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } } } } @@ -125,6 +141,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return index; } + public void postIndexUnderLock(Engine.Index index) { + if (listeners != null) { + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndexUnderLock(index); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } + } + } + } + public void postIndex(Engine.Index index) { long took = index.endTime() - index.startTime(); totalStats.indexMetric.inc(took); @@ -134,7 +162,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { typeStats.indexCurrent.dec(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { - listener.postIndex(index); + try { + listener.postIndex(index); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } } } } @@ -155,6 +187,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return delete; } + public void postDeleteUnderLock(Engine.Delete delete) { + if (listeners != null) { + for (IndexingOperationListener listener : listeners) { + try { + listener.postDeleteUnderLock(delete); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } + } + } + } + public void postDelete(Engine.Delete delete) { long took = delete.endTime() - delete.startTime(); totalStats.deleteMetric.inc(took); @@ -164,7 +208,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { typeStats.deleteCurrent.dec(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { - listener.postDelete(delete); + try { + listener.postDelete(delete); + } catch (Exception e) { + logger.warn("post listener [{}] failed", e, listener); + } } } } diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java index 15c0450a0eb..d1150991d94 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java @@ -223,7 +223,7 @@ public class PercolatorExecutor extends AbstractIndexComponent { String currentFieldName = null; XContentParser.Token token = parser.nextToken(); // move the START_OBJECT if (token != XContentParser.Token.START_OBJECT) { - throw new ElasticSearchException("Failed to add query [" + name + "], not starting with OBJECT"); + throw new ElasticSearchException("failed to parse query [" + name + "], not starting with OBJECT"); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -240,8 +240,8 @@ public class PercolatorExecutor extends AbstractIndexComponent { } } return query; - } catch (IOException e) { - throw new ElasticSearchException("Failed to add query [" + name + "]", e); + } catch (Exception e) { + throw new ElasticSearchException("failed to parse query [" + name + "]", e); } finally { if (parser != null) { parser.close(); diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java index a3123120207..0c7c4345108 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java @@ -257,26 +257,44 @@ public class PercolatorService extends AbstractIndexComponent { @Override public Engine.Create preCreate(Engine.Create create) { + // validate the query here, before we index if (create.type().equals(index().name())) { - percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength()); + percolator.parseQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength()); } return create; } + @Override + public void postCreateUnderLock(Engine.Create create) { + // add the query under a doc lock + if (create.type().equals(index().name())) { + percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength()); + } + } + @Override public Engine.Index preIndex(Engine.Index index) { + // validate the query here, before we index if (index.type().equals(index().name())) { - percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength()); + percolator.parseQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength()); } return index; } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void postIndexUnderLock(Engine.Index index) { + // add the query under a doc lock + if (index.type().equals(index().name())) { + percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength()); + } + } + + @Override + public void postDeleteUnderLock(Engine.Delete delete) { + // remove the query under a lock if (delete.type().equals(index().name())) { percolator.removeQuery(delete.id()); } - return delete; } } } diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java index fd0e2b7819e..18a8c9e5b82 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.bloom.none.NoneBloomCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.robin.RobinEngine; +import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; @@ -38,7 +39,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_ public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { protected Engine createEngine(Store store, Translog translog) { - return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), + return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); } }