From 54d1e35d845bde4985d1b1557143d98f42c060c4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 9 Jan 2016 21:50:17 +0100 Subject: [PATCH] Cleanup IndexingOperationListeners infrastructure This commit reduces the former ShardIndexinService to a simple stats/metrics class, moves IndexingSlowLog to the IndexService level since it can be shared across shards of an index and is now hidden behind IndexingOperationListener. IndexingOperationListener is now a first class citizen in IndexShard and is passed in from IndexService. --- .../admin/indices/stats/CommonStats.java | 2 +- .../action/bulk/TransportShardBulkAction.java | 2 +- .../action/update/TransportUpdateAction.java | 2 +- .../elasticsearch/cluster/ClusterModule.java | 2 +- .../org/elasticsearch/index/IndexService.java | 13 +- .../index/{indexing => }/IndexingSlowLog.java | 12 +- .../index/engine/EngineConfig.java | 1 - .../index/engine/InternalEngine.java | 2 - .../indexing/IndexingOperationListener.java | 70 -------- .../elasticsearch/index/shard/IndexShard.java | 43 +++-- .../shard/IndexingOperationListener.java | 152 ++++++++++++++++ .../{indexing => shard}/IndexingStats.java | 2 +- .../InternalIndexingStats.java} | 79 +-------- .../elasticsearch/indices/IndicesService.java | 2 +- .../indices/NodeIndicesStats.java | 2 +- .../rest/action/cat/RestNodesAction.java | 2 +- .../{indexing => }/IndexingSlowLogTests.java | 4 +- .../index/shard/IndexShardTests.java | 130 ++++++++------ .../shard/IndexingOperationListenerTests.java | 162 ++++++++++++++++++ 19 files changed, 449 insertions(+), 235 deletions(-) rename core/src/main/java/org/elasticsearch/index/{indexing => }/IndexingSlowLog.java (96%) delete mode 100644 core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java create mode 100644 core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java rename core/src/main/java/org/elasticsearch/index/{indexing => shard}/IndexingStats.java (99%) rename core/src/main/java/org/elasticsearch/index/{indexing/ShardIndexingService.java => shard/InternalIndexingStats.java} (67%) rename core/src/test/java/org/elasticsearch/index/{indexing => }/IndexingSlowLogTests.java (94%) create mode 100644 core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index ebdc8b72c74..85644e8523e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -32,7 +32,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.recovery.RecoveryStats; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2597695a1e2..d7d40426a6e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportReplicationAction nodeServicesProvider.getIndicesQueryCache().onClose(shardId))); if (useShadowEngine(primary, indexSettings)) { - indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); + indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index } else { - indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); + indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog); } eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); @@ -552,6 +554,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } catch (Exception e) { logger.warn("failed to refresh index store settings", e); } + try { + slowLog.onRefreshSettings(settings); // this will be refactored soon anyway so duplication is ok here + } catch (Exception e) { + logger.warn("failed to refresh slowlog settings", e); + } } } diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java similarity index 96% rename from core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java rename to core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java index 292c2a16e91..5cd3685b2f8 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java +++ b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.indexing; +package org.elasticsearch.index; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.shard.IndexingOperationListener; import java.io.IOException; import java.util.Locale; @@ -35,7 +36,7 @@ import java.util.concurrent.TimeUnit; /** */ -public final class IndexingSlowLog { +public final class IndexingSlowLog implements IndexingOperationListener { private boolean reformat; @@ -124,8 +125,9 @@ public final class IndexingSlowLog { } } - void postIndex(Engine.Index index, long tookInNanos) { - postIndexing(index.parsedDoc(), tookInNanos); + public void postIndex(Engine.Index index) { + final long took = index.endTime() - index.startTime(); + postIndexing(index.parsedDoc(), took); } /** @@ -192,4 +194,4 @@ public final class IndexingSlowLog { return sb.toString(); } } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 35f1e066d3a..4086bebfa88 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1fcbbee5414..d93ce36686e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -55,12 +55,10 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java deleted file mode 100644 index 39f3dd602fb..00000000000 --- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.index.indexing; - -import org.elasticsearch.index.engine.Engine; - -/** - * An indexing listener for indexing, delete, events. - */ -public abstract class IndexingOperationListener { - - /** - * Called before the indexing occurs. - */ - public Engine.Index preIndex(Engine.Index operation) { - return operation; - } - - /** - * Called after the indexing operation occurred. - */ - public void postIndex(Engine.Index index) { - - } - - /** - * Called after the indexing operation occurred with exception. - */ - public void postIndex(Engine.Index index, Throwable ex) { - - } - - /** - * Called before the delete occurs. - */ - public Engine.Delete preDelete(Engine.Delete delete) { - return delete; - } - - - /** - * Called after the delete operation occurred. - */ - public void postDelete(Engine.Delete delete) { - - } - - /** - * Called after the delete operation occurred with exception. - */ - public void postDelete(Engine.Delete delete, Throwable ex) { - - } -} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 54bd1cd8443..db5e1f7e4a5 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -81,8 +81,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.ShardGetService; -import org.elasticsearch.index.indexing.IndexingStats; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.MapperService; @@ -125,6 +123,8 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -143,7 +143,7 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexCache indexCache; private final Store store; private final MergeSchedulerConfig mergeSchedulerConfig; - private final ShardIndexingService indexingService; + private final InternalIndexingStats internalIndexingStats; private final ShardSearchStats searchService; private final ShardGetService getService; private final ShardIndexWarmerService shardWarmerService; @@ -167,7 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexEventListener indexEventListener; private final IndexSettings idxSettings; private final NodeServicesProvider provider; - private TimeValue refreshInterval; private volatile ScheduledFuture refreshScheduledFuture; @@ -176,6 +175,8 @@ public class IndexShard extends AbstractIndexShardComponent { protected final AtomicReference currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; + private final IndexingOperationListener indexingOperationListeners; + @Nullable private RecoveryState recoveryState; @@ -215,7 +216,7 @@ public class IndexShard extends AbstractIndexShardComponent { public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) { + IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5))); @@ -232,7 +233,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.threadPool = provider.getThreadPool(); this.mapperService = mapperService; this.indexCache = indexCache; - this.indexingService = new ShardIndexingService(shardId, indexSettings); + this.internalIndexingStats = new InternalIndexingStats(); + final List listenersList = new ArrayList<>(Arrays.asList(listeners)); + listenersList.add(internalIndexingStats); + this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); this.getService = new ShardGetService(indexSettings, this, mapperService); this.termVectorsService = provider.getTermVectorsService(); this.searchService = new ShardSearchStats(settings); @@ -285,10 +289,6 @@ public class IndexShard extends AbstractIndexShardComponent { return true; } - public ShardIndexingService indexingService() { - return this.indexingService; - } - public ShardGetService getService() { return this.getService; } @@ -489,7 +489,7 @@ public class IndexShard extends AbstractIndexShardComponent { public boolean index(Engine.Index index) { ensureWriteAllowed(index); markLastWrite(); - index = indexingService.preIndex(index); + index = indexingOperationListeners.preIndex(index); final boolean created; try { if (logger.isTraceEnabled()) { @@ -503,10 +503,10 @@ public class IndexShard extends AbstractIndexShardComponent { } index.endTime(System.nanoTime()); } catch (Throwable ex) { - indexingService.postIndex(index, ex); + indexingOperationListeners.postIndex(index, ex); throw ex; } - indexingService.postIndex(index); + indexingOperationListeners.postIndex(index); return created; } @@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent { public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); markLastWrite(); - delete = indexingService.preDelete(delete); + delete = indexingOperationListeners.preDelete(delete); try { if (logger.isTraceEnabled()) { logger.trace("delete [{}]", delete.uid().text()); @@ -545,10 +545,10 @@ public class IndexShard extends AbstractIndexShardComponent { } delete.endTime(System.nanoTime()); } catch (Throwable ex) { - indexingService.postDelete(delete, ex); + indexingOperationListeners.postDelete(delete, ex); throw ex; } - indexingService.postDelete(delete); + indexingOperationListeners.postDelete(delete); } public Engine.GetResult get(Engine.Get get) { @@ -600,7 +600,7 @@ public class IndexShard extends AbstractIndexShardComponent { throttled = engine.isThrottled(); throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); } - return indexingService.stats(throttled, throttleTimeInMillis, types); + return internalIndexingStats.stats(throttled, throttleTimeInMillis, types); } public SearchStats searchStats(String... groups) { @@ -1222,7 +1222,6 @@ public class IndexShard extends AbstractIndexShardComponent { } mergePolicyConfig.onRefreshSettings(settings); searchService.onRefreshSettings(settings); - indexingService.onRefreshSettings(settings); if (change) { getEngine().onSettingsChanged(); } @@ -1258,6 +1257,14 @@ public class IndexShard extends AbstractIndexShardComponent { return inactiveTime; } + /** + * Should be called for each no-op update operation to increment relevant statistics. + * @param type the doc type of the update + */ + public void noopUpdate(String type) { + internalIndexingStats.noopUpdate(type); + } + class EngineRefresher implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java new file mode 100644 index 00000000000..e5d3574223a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.engine.Engine; + +import java.util.List; + +/** + * An indexing listener for indexing, delete, events. + */ +public interface IndexingOperationListener { + + /** + * Called before the indexing occurs. + */ + default Engine.Index preIndex(Engine.Index operation) { + return operation; + } + + /** + * Called after the indexing operation occurred. + */ + default void postIndex(Engine.Index index) {} + + /** + * Called after the indexing operation occurred with exception. + */ + default void postIndex(Engine.Index index, Throwable ex) {} + + /** + * Called before the delete occurs. + */ + default Engine.Delete preDelete(Engine.Delete delete) { + return delete; + } + + + /** + * Called after the delete operation occurred. + */ + default void postDelete(Engine.Delete delete) {} + + /** + * Called after the delete operation occurred with exception. + */ + default void postDelete(Engine.Delete delete, Throwable ex) {} + + /** + * A Composite listener that multiplexes calls to each of the listeners methods. + */ + final class CompositeListener implements IndexingOperationListener{ + private final List listeners; + private final ESLogger logger; + + public CompositeListener(List listeners, ESLogger logger) { + this.listeners = listeners; + this.logger = logger; + } + + @Override + public Engine.Index preIndex(Engine.Index operation) { + assert operation != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.preIndex(operation); + } catch (Throwable t) { + logger.warn("preIndex listener [{}] failed", t, listener); + } + } + return operation; + } + + @Override + public void postIndex(Engine.Index index) { + assert index != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndex(index); + } catch (Throwable t) { + logger.warn("postIndex listener [{}] failed", t, listener); + } + } + } + + @Override + public void postIndex(Engine.Index index, Throwable ex) { + assert index != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndex(index, ex); + } catch (Throwable t) { + logger.warn("postIndex listener [{}] failed", t, listener); + } + } + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.preDelete(delete); + } catch (Throwable t) { + logger.warn("preDelete listener [{}] failed", t, listener); + } + } + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete); + } catch (Throwable t) { + logger.warn("postDelete listener [{}] failed", t, listener); + } + } + } + + @Override + public void postDelete(Engine.Delete delete, Throwable ex) { + assert delete != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete, ex); + } catch (Throwable t) { + logger.warn("postDelete listener [{}] failed", t, listener); + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java similarity index 99% rename from core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java rename to core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java index 07ca8af17e3..27cda2ca1c8 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.indexing; +package org.elasticsearch.index.shard; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java similarity index 67% rename from core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java rename to core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index f7175c02c59..9996d705b33 100644 --- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -17,49 +17,34 @@ * under the License. */ -package org.elasticsearch.index.indexing; +package org.elasticsearch.index.shard; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; /** + * Internal class that maintains relevant indexing statistics / metrics. + * @see IndexShard */ -public class ShardIndexingService extends AbstractIndexShardComponent { - - private final IndexingSlowLog slowLog; - +final class InternalIndexingStats implements IndexingOperationListener { private final StatsHolder totalStats = new StatsHolder(); - - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - private volatile Map typesStats = emptyMap(); - public ShardIndexingService(ShardId shardId, IndexSettings indexSettings) { - super(shardId, indexSettings); - this.slowLog = new IndexingSlowLog(this.indexSettings.getSettings()); - } - /** * Returns the stats, including type specific stats. If the types are null/0 length, then nothing * is returned for them. If they are set, then only types provided will be returned, or * _all for all types. */ - public IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) { + IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) { IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis); Map typesSt = null; if (types != null && types.length > 0) { @@ -79,20 +64,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return new IndexingStats(total, typesSt); } - public void addListener(IndexingOperationListener listener) { - listeners.add(listener); - } - - public void removeListener(IndexingOperationListener listener) { - listeners.remove(listener); - } public Engine.Index preIndex(Engine.Index operation) { totalStats.indexCurrent.inc(); typeStats(operation.type()).indexCurrent.inc(); - for (IndexingOperationListener listener : listeners) { - operation = listener.preIndex(operation); - } return operation; } @@ -103,14 +78,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent { StatsHolder typeStats = typeStats(index.type()); typeStats.indexMetric.inc(took); typeStats.indexCurrent.dec(); - slowLog.postIndex(index, took); - for (IndexingOperationListener listener : listeners) { - try { - listener.postIndex(index); - } catch (Exception e) { - logger.warn("postIndex listener [{}] failed", e, listener); - } - } } public void postIndex(Engine.Index index, Throwable ex) { @@ -118,21 +85,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { typeStats(index.type()).indexCurrent.dec(); totalStats.indexFailed.inc(); typeStats(index.type()).indexFailed.inc(); - for (IndexingOperationListener listener : listeners) { - try { - listener.postIndex(index, ex); - } catch (Throwable t) { - logger.warn("postIndex listener [{}] failed", t, listener); - } - } } public Engine.Delete preDelete(Engine.Delete delete) { totalStats.deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc(); - for (IndexingOperationListener listener : listeners) { - delete = listener.preDelete(delete); - } return delete; } @@ -144,25 +101,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { StatsHolder typeStats = typeStats(delete.type()); typeStats.deleteMetric.inc(took); typeStats.deleteCurrent.dec(); - for (IndexingOperationListener listener : listeners) { - try { - listener.postDelete(delete); - } catch (Exception e) { - logger.warn("postDelete listener [{}] failed", e, listener); - } - } } public void postDelete(Engine.Delete delete, Throwable ex) { totalStats.deleteCurrent.dec(); typeStats(delete.type()).deleteCurrent.dec(); - for (IndexingOperationListener listener : listeners) { - try { - listener. postDelete(delete, ex); - } catch (Throwable t) { - logger.warn("postDelete listener [{}] failed", t, listener); - } - } } public void noopUpdate(String type) { @@ -170,7 +113,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent { typeStats(type).noopUpdates.inc(); } - public void clear() { + public void clear() { // NOCOMMIT - this is unused? totalStats.clear(); synchronized (this) { if (!typesStats.isEmpty()) { @@ -200,10 +143,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent { return stats; } - public void onRefreshSettings(Settings settings) { - slowLog.onRefreshSettings(settings); - } - static class StatsHolder { public final MeanMetric indexMetric = new MeanMetric(); public final MeanMetric deleteMetric = new MeanMetric(); @@ -214,9 +153,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) { return new IndexingStats.Stats( - indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(), - deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), - noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); + indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(), + deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), + noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); } public long totalCurrent() { diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 36ed70ae65a..392a58686eb 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -52,7 +52,7 @@ import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; diff --git a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index c8142f3d37a..0a036cbd801 100644 --- a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -36,7 +36,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.recovery.RecoveryStats; diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index e86132a909e..110aa90047f 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -41,7 +41,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.refresh.RefreshStats; diff --git a/core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java similarity index 94% rename from core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java rename to core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java index ccbef6837c9..e39c0a805fe 100644 --- a/core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java @@ -17,14 +17,14 @@ * under the License. */ -package org.elasticsearch.index.indexing; +package org.elasticsearch.index; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.IntField; import org.apache.lucene.document.StringField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.indexing.IndexingSlowLog.SlowLogParsedDocumentPrinter; +import org.elasticsearch.index.IndexingSlowLog.SlowLogParsedDocumentPrinter; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.test.ESTestCase; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 66e0a0655d6..d18e279636b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -76,8 +76,6 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.flush.FlushStats; -import org.elasticsearch.index.indexing.IndexingOperationListener; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; @@ -109,6 +107,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -612,77 +611,77 @@ public class IndexShardTests extends ESSingleNodeTestCase { return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate); } - public void testPreIndex() throws IOException { - createIndex("testpreindex"); + public void testIndexingOperationsListeners() throws IOException { + createIndex("test_iol"); ensureGreen(); + client().prepareIndex("test_iol", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService("testpreindex"); + IndexService test = indicesService.indexService("test_iol"); IndexShard shard = test.getShardOrNull(0); - ShardIndexingService shardIndexingService = shard.indexingService(); - final AtomicBoolean preIndexCalled = new AtomicBoolean(false); - - shardIndexingService.addListener(new IndexingOperationListener() { + AtomicInteger preIndex = new AtomicInteger(); + AtomicInteger postIndex = new AtomicInteger(); + AtomicInteger postIndexException = new AtomicInteger(); + AtomicInteger preDelete = new AtomicInteger(); + AtomicInteger postDelete = new AtomicInteger(); + AtomicInteger postDeleteException = new AtomicInteger(); + shard = reinitWithWrapper(test, shard, null, new IndexingOperationListener() { @Override public Engine.Index preIndex(Engine.Index operation) { - preIndexCalled.set(true); - return super.preIndex(operation); + preIndex.incrementAndGet(); + return operation; } - }); - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); - Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); - assertTrue(preIndexCalled.get()); - } - - public void testPostIndex() throws IOException { - createIndex("testpostindex"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService("testpostindex"); - IndexShard shard = test.getShardOrNull(0); - ShardIndexingService shardIndexingService = shard.indexingService(); - final AtomicBoolean postIndexCalled = new AtomicBoolean(false); - - shardIndexingService.addListener(new IndexingOperationListener() { @Override public void postIndex(Engine.Index index) { - postIndexCalled.set(true); - super.postIndex(index); + postIndex.incrementAndGet(); + } + + @Override + public void postIndex(Engine.Index index, Throwable ex) { + postIndexException.incrementAndGet(); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + preDelete.incrementAndGet(); + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + postDelete.incrementAndGet(); + } + + @Override + public void postDelete(Engine.Delete delete, Throwable ex) { + postDeleteException.incrementAndGet(); + } }); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); shard.index(index); - assertTrue(postIndexCalled.get()); - } + assertEquals(1, preIndex.get()); + assertEquals(1, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(0, preDelete.get()); + assertEquals(0, postDelete.get()); + assertEquals(0, postDeleteException.get()); - public void testPostIndexWithException() throws IOException { - createIndex("testpostindexwithexception"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService("testpostindexwithexception"); - IndexShard shard = test.getShardOrNull(0); - ShardIndexingService shardIndexingService = shard.indexingService(); + Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); + shard.delete(delete); + + assertEquals(1, preIndex.get()); + assertEquals(1, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(1, preDelete.get()); + assertEquals(1, postDelete.get()); + assertEquals(0, postDeleteException.get()); shard.close("Unexpected close", true); shard.state = IndexShardState.STARTED; // It will generate exception - final AtomicBoolean postIndexWithExceptionCalled = new AtomicBoolean(false); - - shardIndexingService.addListener(new IndexingOperationListener() { - @Override - public void postIndex(Engine.Index index, Throwable ex) { - assertNotNull(ex); - postIndexWithExceptionCalled.set(true); - super.postIndex(index, ex); - } - }); - - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); - Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - try { shard.index(index); fail(); @@ -690,7 +689,26 @@ public class IndexShardTests extends ESSingleNodeTestCase { } - assertTrue(postIndexWithExceptionCalled.get()); + assertEquals(2, preIndex.get()); + assertEquals(1, postIndex.get()); + assertEquals(1, postIndexException.get()); + assertEquals(1, preDelete.get()); + assertEquals(1, postDelete.get()); + assertEquals(0, postDeleteException.get()); + try { + shard.delete(delete); + fail(); + }catch (IllegalIndexShardStateException e){ + + } + + assertEquals(2, preIndex.get()); + assertEquals(1, postIndex.get()); + assertEquals(1, postIndexException.get()); + assertEquals(2, preDelete.get()); + assertEquals(1, postDelete.get()); + assertEquals(1, postDeleteException.get()); + } public void testMaybeFlush() throws Exception { @@ -1041,11 +1059,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { // test will fail due to unclosed searchers if the searcher is not released } - private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException { + private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { ShardRouting routing = new ShardRouting(shard.routingEntry()); shard.close("simon says", true); NodeServicesProvider indexServices = indexService.getIndexServices(); - IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices); + IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, listeners); ShardRoutingHelper.reinit(routing); newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java new file mode 100644 index 00000000000..92bbf06a7b6 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.Term; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class IndexingOperationListenerTests extends ESTestCase{ + + // this test also tests if calls are correct if one or more listeners throw exceptions + public void testListenersAreExecuted() { + AtomicInteger preIndex = new AtomicInteger(); + AtomicInteger postIndex = new AtomicInteger(); + AtomicInteger postIndexException = new AtomicInteger(); + AtomicInteger preDelete = new AtomicInteger(); + AtomicInteger postDelete = new AtomicInteger(); + AtomicInteger postDeleteException = new AtomicInteger(); + IndexingOperationListener listener = new IndexingOperationListener() { + @Override + public Engine.Index preIndex(Engine.Index operation) { + preIndex.incrementAndGet(); + return operation; + } + + @Override + public void postIndex(Engine.Index index) { + postIndex.incrementAndGet(); + } + + @Override + public void postIndex(Engine.Index index, Throwable ex) { + postIndexException.incrementAndGet(); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + preDelete.incrementAndGet(); + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + postDelete.incrementAndGet(); + } + + @Override + public void postDelete(Engine.Delete delete, Throwable ex) { + postDeleteException.incrementAndGet(); + } + }; + + IndexingOperationListener throwingListener = new IndexingOperationListener() { + @Override + public Engine.Index preIndex(Engine.Index operation) { + throw new RuntimeException(); + } + + @Override + public void postIndex(Engine.Index index) { + throw new RuntimeException(); } + + @Override + public void postIndex(Engine.Index index, Throwable ex) { + throw new RuntimeException(); } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + throw new RuntimeException(); + } + + @Override + public void postDelete(Engine.Delete delete) { + throw new RuntimeException(); } + + @Override + public void postDelete(Engine.Delete delete, Throwable ex) { + throw new RuntimeException(); + } + }; + final List indexingOperationListeners = new ArrayList<>(Arrays.asList(listener, listener)); + if (randomBoolean()) { + indexingOperationListeners.add(throwingListener); + if (randomBoolean()) { + indexingOperationListeners.add(throwingListener); + } + } + Collections.shuffle(indexingOperationListeners, random()); + IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); + Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); + Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); + compositeListener.postDelete(delete); + assertEquals(0, preIndex.get()); + assertEquals(0, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(0, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(0, postDeleteException.get()); + + compositeListener.postDelete(delete, new RuntimeException()); + assertEquals(0, preIndex.get()); + assertEquals(0, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(0, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(2, postDeleteException.get()); + + compositeListener.preDelete(delete); + assertEquals(0, preIndex.get()); + assertEquals(0, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(2, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(2, postDeleteException.get()); + + compositeListener.postIndex(index); + assertEquals(0, preIndex.get()); + assertEquals(2, postIndex.get()); + assertEquals(0, postIndexException.get()); + assertEquals(2, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(2, postDeleteException.get()); + + compositeListener.postIndex(index, new RuntimeException()); + assertEquals(0, preIndex.get()); + assertEquals(2, postIndex.get()); + assertEquals(2, postIndexException.get()); + assertEquals(2, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(2, postDeleteException.get()); + + compositeListener.preIndex(index); + assertEquals(2, preIndex.get()); + assertEquals(2, postIndex.get()); + assertEquals(2, postIndexException.get()); + assertEquals(2, preDelete.get()); + assertEquals(2, postDelete.get()); + assertEquals(2, postDeleteException.get()); + } +}