Cleanup IndexingOperationListeners infrastructure

This commit reduces the former ShardIndexinService to a simple stats/metrics
class, moves IndexingSlowLog to the IndexService level since it can be shared
across shards of an index and is now hidden behind IndexingOperationListener.

IndexingOperationListener is now a first class citizen in IndexShard and is passed
in from IndexService.
This commit is contained in:
Simon Willnauer 2016-01-09 21:50:17 +01:00
parent 2dbad1d65a
commit 54d1e35d84
19 changed files with 449 additions and 235 deletions

View File

@ -32,7 +32,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.recovery.RecoveryStats;

View File

@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
} }
case NONE: case NONE:
UpdateResponse updateResponse = translate.action(); UpdateResponse updateResponse = translate.action();
indexShard.indexingService().noopUpdate(updateRequest.type()); indexShard.noopUpdate(updateRequest.type());
return new UpdateResult(translate, updateResponse); return new UpdateResult(translate, updateResponse);
default: default:
throw new IllegalStateException("Illegal update operation " + translate.operation()); throw new IllegalStateException("Illegal update operation " + translate.operation());

View File

@ -269,7 +269,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
if (indexServiceOrNull != null) { if (indexServiceOrNull != null) {
IndexShard shard = indexService.getShardOrNull(request.shardId()); IndexShard shard = indexService.getShardOrNull(request.shardId());
if (shard != null) { if (shard != null) {
shard.indexingService().noopUpdate(request.type()); shard.noopUpdate(request.type());
} }
} }
listener.onResponse(update); listener.onResponse(update);

View File

@ -67,7 +67,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.indexing.IndexingSlowLog; import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.search.stats.SearchSlowLog; import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.settings.IndexDynamicSettings; import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;

View File

@ -45,7 +45,6 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardContext;
@ -68,6 +67,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -101,6 +101,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings; private final IndexSettings indexSettings;
private final IndexingSlowLog slowLog;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService, SimilarityService similarityService,
@ -130,6 +131,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.engineFactory = engineFactory; this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this); this.searcherWrapper = wrapperFactory.newWrapper(this);
this.slowLog = new IndexingSlowLog(indexSettings.getSettings());
} }
public int numberOfShards() { public int numberOfShards() {
@ -292,9 +294,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); (primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId))); store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
if (useShadowEngine(primary, indexSettings)) { if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index
} else { } else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog);
} }
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
@ -552,6 +554,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to refresh index store settings", e); logger.warn("failed to refresh index store settings", e);
} }
try {
slowLog.onRefreshSettings(settings); // this will be refactored soon anyway so duplication is ok here
} catch (Exception e) {
logger.warn("failed to refresh slowlog settings", e);
}
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.indexing; package org.elasticsearch.index;
import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.IndexingOperationListener;
import java.io.IOException; import java.io.IOException;
import java.util.Locale; import java.util.Locale;
@ -35,7 +36,7 @@ import java.util.concurrent.TimeUnit;
/** /**
*/ */
public final class IndexingSlowLog { public final class IndexingSlowLog implements IndexingOperationListener {
private boolean reformat; private boolean reformat;
@ -124,8 +125,9 @@ public final class IndexingSlowLog {
} }
} }
void postIndex(Engine.Index index, long tookInNanos) { public void postIndex(Engine.Index index) {
postIndexing(index.parsedDoc(), tookInNanos); final long took = index.endTime() - index.startTime();
postIndexing(index.parsedDoc(), took);
} }
/** /**
@ -192,4 +194,4 @@ public final class IndexingSlowLog {
return sb.toString(); return sb.toString();
} }
} }
} }

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;

View File

@ -55,12 +55,10 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.OnGoingMerge;

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.indexing;
import org.elasticsearch.index.engine.Engine;
/**
* An indexing listener for indexing, delete, events.
*/
public abstract class IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
public Engine.Index preIndex(Engine.Index operation) {
return operation;
}
/**
* Called after the indexing operation occurred.
*/
public void postIndex(Engine.Index index) {
}
/**
* Called after the indexing operation occurred with exception.
*/
public void postIndex(Engine.Index index, Throwable ex) {
}
/**
* Called before the delete occurs.
*/
public Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
/**
* Called after the delete operation occurred.
*/
public void postDelete(Engine.Delete delete) {
}
/**
* Called after the delete operation occurred with exception.
*/
public void postDelete(Engine.Delete delete, Throwable ex) {
}
}

View File

@ -81,8 +81,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService; import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
@ -125,6 +123,8 @@ import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -143,7 +143,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache; private final IndexCache indexCache;
private final Store store; private final Store store;
private final MergeSchedulerConfig mergeSchedulerConfig; private final MergeSchedulerConfig mergeSchedulerConfig;
private final ShardIndexingService indexingService; private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchService; private final ShardSearchStats searchService;
private final ShardGetService getService; private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService; private final ShardIndexWarmerService shardWarmerService;
@ -167,7 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexEventListener indexEventListener; private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings; private final IndexSettings idxSettings;
private final NodeServicesProvider provider; private final NodeServicesProvider provider;
private TimeValue refreshInterval; private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture; private volatile ScheduledFuture<?> refreshScheduledFuture;
@ -176,6 +175,8 @@ public class IndexShard extends AbstractIndexShardComponent {
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>(); protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory; protected final EngineFactory engineFactory;
private final IndexingOperationListener indexingOperationListeners;
@Nullable @Nullable
private RecoveryState recoveryState; private RecoveryState recoveryState;
@ -215,7 +216,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) { IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
super(shardId, indexSettings); super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings(); final Settings settings = indexSettings.getSettings();
this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5))); this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
@ -232,7 +233,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.threadPool = provider.getThreadPool(); this.threadPool = provider.getThreadPool();
this.mapperService = mapperService; this.mapperService = mapperService;
this.indexCache = indexCache; this.indexCache = indexCache;
this.indexingService = new ShardIndexingService(shardId, indexSettings); this.internalIndexingStats = new InternalIndexingStats();
final List<IndexingOperationListener> listenersList = new ArrayList<>(Arrays.asList(listeners));
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService); this.getService = new ShardGetService(indexSettings, this, mapperService);
this.termVectorsService = provider.getTermVectorsService(); this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(settings); this.searchService = new ShardSearchStats(settings);
@ -285,10 +289,6 @@ public class IndexShard extends AbstractIndexShardComponent {
return true; return true;
} }
public ShardIndexingService indexingService() {
return this.indexingService;
}
public ShardGetService getService() { public ShardGetService getService() {
return this.getService; return this.getService;
} }
@ -489,7 +489,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public boolean index(Engine.Index index) { public boolean index(Engine.Index index) {
ensureWriteAllowed(index); ensureWriteAllowed(index);
markLastWrite(); markLastWrite();
index = indexingService.preIndex(index); index = indexingOperationListeners.preIndex(index);
final boolean created; final boolean created;
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -503,10 +503,10 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
index.endTime(System.nanoTime()); index.endTime(System.nanoTime());
} catch (Throwable ex) { } catch (Throwable ex) {
indexingService.postIndex(index, ex); indexingOperationListeners.postIndex(index, ex);
throw ex; throw ex;
} }
indexingService.postIndex(index); indexingOperationListeners.postIndex(index);
return created; return created;
} }
@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public void delete(Engine.Delete delete) { public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete); ensureWriteAllowed(delete);
markLastWrite(); markLastWrite();
delete = indexingService.preDelete(delete); delete = indexingOperationListeners.preDelete(delete);
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text()); logger.trace("delete [{}]", delete.uid().text());
@ -545,10 +545,10 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
delete.endTime(System.nanoTime()); delete.endTime(System.nanoTime());
} catch (Throwable ex) { } catch (Throwable ex) {
indexingService.postDelete(delete, ex); indexingOperationListeners.postDelete(delete, ex);
throw ex; throw ex;
} }
indexingService.postDelete(delete); indexingOperationListeners.postDelete(delete);
} }
public Engine.GetResult get(Engine.Get get) { public Engine.GetResult get(Engine.Get get) {
@ -600,7 +600,7 @@ public class IndexShard extends AbstractIndexShardComponent {
throttled = engine.isThrottled(); throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
} }
return indexingService.stats(throttled, throttleTimeInMillis, types); return internalIndexingStats.stats(throttled, throttleTimeInMillis, types);
} }
public SearchStats searchStats(String... groups) { public SearchStats searchStats(String... groups) {
@ -1222,7 +1222,6 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
mergePolicyConfig.onRefreshSettings(settings); mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings); searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) { if (change) {
getEngine().onSettingsChanged(); getEngine().onSettingsChanged();
} }
@ -1258,6 +1257,14 @@ public class IndexShard extends AbstractIndexShardComponent {
return inactiveTime; return inactiveTime;
} }
/**
* Should be called for each no-op update operation to increment relevant statistics.
* @param type the doc type of the update
*/
public void noopUpdate(String type) {
internalIndexingStats.noopUpdate(type);
}
class EngineRefresher implements Runnable { class EngineRefresher implements Runnable {
@Override @Override
public void run() { public void run() {

View File

@ -0,0 +1,152 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.Engine;
import java.util.List;
/**
* An indexing listener for indexing, delete, events.
*/
public interface IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
default Engine.Index preIndex(Engine.Index operation) {
return operation;
}
/**
* Called after the indexing operation occurred.
*/
default void postIndex(Engine.Index index) {}
/**
* Called after the indexing operation occurred with exception.
*/
default void postIndex(Engine.Index index, Throwable ex) {}
/**
* Called before the delete occurs.
*/
default Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
/**
* Called after the delete operation occurred.
*/
default void postDelete(Engine.Delete delete) {}
/**
* Called after the delete operation occurred with exception.
*/
default void postDelete(Engine.Delete delete, Throwable ex) {}
/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeListener implements IndexingOperationListener{
private final List<IndexingOperationListener> listeners;
private final ESLogger logger;
public CompositeListener(List<IndexingOperationListener> listeners, ESLogger logger) {
this.listeners = listeners;
this.logger = logger;
}
@Override
public Engine.Index preIndex(Engine.Index operation) {
assert operation != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preIndex(operation);
} catch (Throwable t) {
logger.warn("preIndex listener [{}] failed", t, listener);
}
}
return operation;
}
@Override
public void postIndex(Engine.Index index) {
assert index != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index);
} catch (Throwable t) {
logger.warn("postIndex listener [{}] failed", t, listener);
}
}
}
@Override
public void postIndex(Engine.Index index, Throwable ex) {
assert index != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, ex);
} catch (Throwable t) {
logger.warn("postIndex listener [{}] failed", t, listener);
}
}
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preDelete(delete);
} catch (Throwable t) {
logger.warn("preDelete listener [{}] failed", t, listener);
}
}
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete);
} catch (Throwable t) {
logger.warn("postDelete listener [{}] failed", t, listener);
}
}
}
@Override
public void postDelete(Engine.Delete delete, Throwable ex) {
assert delete != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete, ex);
} catch (Throwable t) {
logger.warn("postDelete listener [{}] failed", t, listener);
}
}
}
}
}

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.indexing; package org.elasticsearch.index.shard;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;

View File

@ -17,49 +17,34 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.indexing; package org.elasticsearch.index.shard;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
/** /**
* Internal class that maintains relevant indexing statistics / metrics.
* @see IndexShard
*/ */
public class ShardIndexingService extends AbstractIndexShardComponent { final class InternalIndexingStats implements IndexingOperationListener {
private final IndexingSlowLog slowLog;
private final StatsHolder totalStats = new StatsHolder(); private final StatsHolder totalStats = new StatsHolder();
private final CopyOnWriteArrayList<IndexingOperationListener> listeners = new CopyOnWriteArrayList<>();
private volatile Map<String, StatsHolder> typesStats = emptyMap(); private volatile Map<String, StatsHolder> typesStats = emptyMap();
public ShardIndexingService(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
this.slowLog = new IndexingSlowLog(this.indexSettings.getSettings());
}
/** /**
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing * Returns the stats, including type specific stats. If the types are null/0 length, then nothing
* is returned for them. If they are set, then only types provided will be returned, or * is returned for them. If they are set, then only types provided will be returned, or
* <tt>_all</tt> for all types. * <tt>_all</tt> for all types.
*/ */
public IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) { IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) {
IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis); IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis);
Map<String, IndexingStats.Stats> typesSt = null; Map<String, IndexingStats.Stats> typesSt = null;
if (types != null && types.length > 0) { if (types != null && types.length > 0) {
@ -79,20 +64,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return new IndexingStats(total, typesSt); return new IndexingStats(total, typesSt);
} }
public void addListener(IndexingOperationListener listener) {
listeners.add(listener);
}
public void removeListener(IndexingOperationListener listener) {
listeners.remove(listener);
}
public Engine.Index preIndex(Engine.Index operation) { public Engine.Index preIndex(Engine.Index operation) {
totalStats.indexCurrent.inc(); totalStats.indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc(); typeStats(operation.type()).indexCurrent.inc();
for (IndexingOperationListener listener : listeners) {
operation = listener.preIndex(operation);
}
return operation; return operation;
} }
@ -103,14 +78,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
StatsHolder typeStats = typeStats(index.type()); StatsHolder typeStats = typeStats(index.type());
typeStats.indexMetric.inc(took); typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec(); typeStats.indexCurrent.dec();
slowLog.postIndex(index, took);
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index);
} catch (Exception e) {
logger.warn("postIndex listener [{}] failed", e, listener);
}
}
} }
public void postIndex(Engine.Index index, Throwable ex) { public void postIndex(Engine.Index index, Throwable ex) {
@ -118,21 +85,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats(index.type()).indexCurrent.dec(); typeStats(index.type()).indexCurrent.dec();
totalStats.indexFailed.inc(); totalStats.indexFailed.inc();
typeStats(index.type()).indexFailed.inc(); typeStats(index.type()).indexFailed.inc();
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, ex);
} catch (Throwable t) {
logger.warn("postIndex listener [{}] failed", t, listener);
}
}
} }
public Engine.Delete preDelete(Engine.Delete delete) { public Engine.Delete preDelete(Engine.Delete delete) {
totalStats.deleteCurrent.inc(); totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc();
for (IndexingOperationListener listener : listeners) {
delete = listener.preDelete(delete);
}
return delete; return delete;
} }
@ -144,25 +101,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
StatsHolder typeStats = typeStats(delete.type()); StatsHolder typeStats = typeStats(delete.type());
typeStats.deleteMetric.inc(took); typeStats.deleteMetric.inc(took);
typeStats.deleteCurrent.dec(); typeStats.deleteCurrent.dec();
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete);
} catch (Exception e) {
logger.warn("postDelete listener [{}] failed", e, listener);
}
}
} }
public void postDelete(Engine.Delete delete, Throwable ex) { public void postDelete(Engine.Delete delete, Throwable ex) {
totalStats.deleteCurrent.dec(); totalStats.deleteCurrent.dec();
typeStats(delete.type()).deleteCurrent.dec(); typeStats(delete.type()).deleteCurrent.dec();
for (IndexingOperationListener listener : listeners) {
try {
listener. postDelete(delete, ex);
} catch (Throwable t) {
logger.warn("postDelete listener [{}] failed", t, listener);
}
}
} }
public void noopUpdate(String type) { public void noopUpdate(String type) {
@ -170,7 +113,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats(type).noopUpdates.inc(); typeStats(type).noopUpdates.inc();
} }
public void clear() { public void clear() { // NOCOMMIT - this is unused?
totalStats.clear(); totalStats.clear();
synchronized (this) { synchronized (this) {
if (!typesStats.isEmpty()) { if (!typesStats.isEmpty()) {
@ -200,10 +143,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return stats; return stats;
} }
public void onRefreshSettings(Settings settings) {
slowLog.onRefreshSettings(settings);
}
static class StatsHolder { static class StatsHolder {
public final MeanMetric indexMetric = new MeanMetric(); public final MeanMetric indexMetric = new MeanMetric();
public final MeanMetric deleteMetric = new MeanMetric(); public final MeanMetric deleteMetric = new MeanMetric();
@ -214,9 +153,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) { public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
return new IndexingStats.Stats( return new IndexingStats.Stats(
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(), indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(),
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
} }
public long totalCurrent() { public long totalCurrent() {

View File

@ -52,7 +52,7 @@ import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;

View File

@ -36,7 +36,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.recovery.RecoveryStats;

View File

@ -41,7 +41,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats; import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;

View File

@ -17,14 +17,14 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.indexing; package org.elasticsearch.index;
import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField; import org.apache.lucene.document.IntField;
import org.apache.lucene.document.StringField; import org.apache.lucene.document.StringField;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.indexing.IndexingSlowLog.SlowLogParsedDocumentPrinter; import org.elasticsearch.index.IndexingSlowLog.SlowLogParsedDocumentPrinter;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;

View File

@ -76,8 +76,6 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParseContext;
@ -109,6 +107,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -612,77 +611,77 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate); return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate);
} }
public void testPreIndex() throws IOException { public void testIndexingOperationsListeners() throws IOException {
createIndex("testpreindex"); createIndex("test_iol");
ensureGreen(); ensureGreen();
client().prepareIndex("test_iol", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testpreindex"); IndexService test = indicesService.indexService("test_iol");
IndexShard shard = test.getShardOrNull(0); IndexShard shard = test.getShardOrNull(0);
ShardIndexingService shardIndexingService = shard.indexingService(); AtomicInteger preIndex = new AtomicInteger();
final AtomicBoolean preIndexCalled = new AtomicBoolean(false); AtomicInteger postIndex = new AtomicInteger();
AtomicInteger postIndexException = new AtomicInteger();
shardIndexingService.addListener(new IndexingOperationListener() { AtomicInteger preDelete = new AtomicInteger();
AtomicInteger postDelete = new AtomicInteger();
AtomicInteger postDeleteException = new AtomicInteger();
shard = reinitWithWrapper(test, shard, null, new IndexingOperationListener() {
@Override @Override
public Engine.Index preIndex(Engine.Index operation) { public Engine.Index preIndex(Engine.Index operation) {
preIndexCalled.set(true); preIndex.incrementAndGet();
return super.preIndex(operation); return operation;
} }
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index);
assertTrue(preIndexCalled.get());
}
public void testPostIndex() throws IOException {
createIndex("testpostindex");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testpostindex");
IndexShard shard = test.getShardOrNull(0);
ShardIndexingService shardIndexingService = shard.indexingService();
final AtomicBoolean postIndexCalled = new AtomicBoolean(false);
shardIndexingService.addListener(new IndexingOperationListener() {
@Override @Override
public void postIndex(Engine.Index index) { public void postIndex(Engine.Index index) {
postIndexCalled.set(true); postIndex.incrementAndGet();
super.postIndex(index); }
@Override
public void postIndex(Engine.Index index, Throwable ex) {
postIndexException.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
@Override
public void postDelete(Engine.Delete delete, Throwable ex) {
postDeleteException.incrementAndGet();
} }
}); });
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index); shard.index(index);
assertTrue(postIndexCalled.get()); assertEquals(1, preIndex.get());
} assertEquals(1, postIndex.get());
assertEquals(0, postIndexException.get());
assertEquals(0, preDelete.get());
assertEquals(0, postDelete.get());
assertEquals(0, postDeleteException.get());
public void testPostIndexWithException() throws IOException { Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
createIndex("testpostindexwithexception"); shard.delete(delete);
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class); assertEquals(1, preIndex.get());
IndexService test = indicesService.indexService("testpostindexwithexception"); assertEquals(1, postIndex.get());
IndexShard shard = test.getShardOrNull(0); assertEquals(0, postIndexException.get());
ShardIndexingService shardIndexingService = shard.indexingService(); assertEquals(1, preDelete.get());
assertEquals(1, postDelete.get());
assertEquals(0, postDeleteException.get());
shard.close("Unexpected close", true); shard.close("Unexpected close", true);
shard.state = IndexShardState.STARTED; // It will generate exception shard.state = IndexShardState.STARTED; // It will generate exception
final AtomicBoolean postIndexWithExceptionCalled = new AtomicBoolean(false);
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, Throwable ex) {
assertNotNull(ex);
postIndexWithExceptionCalled.set(true);
super.postIndex(index, ex);
}
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
try { try {
shard.index(index); shard.index(index);
fail(); fail();
@ -690,7 +689,26 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} }
assertTrue(postIndexWithExceptionCalled.get()); assertEquals(2, preIndex.get());
assertEquals(1, postIndex.get());
assertEquals(1, postIndexException.get());
assertEquals(1, preDelete.get());
assertEquals(1, postDelete.get());
assertEquals(0, postDeleteException.get());
try {
shard.delete(delete);
fail();
}catch (IllegalIndexShardStateException e){
}
assertEquals(2, preIndex.get());
assertEquals(1, postIndex.get());
assertEquals(1, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(1, postDelete.get());
assertEquals(1, postDeleteException.get());
} }
public void testMaybeFlush() throws Exception { public void testMaybeFlush() throws Exception {
@ -1041,11 +1059,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// test will fail due to unclosed searchers if the searcher is not released // test will fail due to unclosed searchers if the searcher is not released
} }
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException { private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry()); ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true); shard.close("simon says", true);
NodeServicesProvider indexServices = indexService.getIndexServices(); NodeServicesProvider indexServices = indexService.getIndexServices();
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices); IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, listeners);
ShardRoutingHelper.reinit(routing); ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false); newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.Term;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class IndexingOperationListenerTests extends ESTestCase{
// this test also tests if calls are correct if one or more listeners throw exceptions
public void testListenersAreExecuted() {
AtomicInteger preIndex = new AtomicInteger();
AtomicInteger postIndex = new AtomicInteger();
AtomicInteger postIndexException = new AtomicInteger();
AtomicInteger preDelete = new AtomicInteger();
AtomicInteger postDelete = new AtomicInteger();
AtomicInteger postDeleteException = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index) {
postIndex.incrementAndGet();
}
@Override
public void postIndex(Engine.Index index, Throwable ex) {
postIndexException.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
@Override
public void postDelete(Engine.Delete delete, Throwable ex) {
postDeleteException.incrementAndGet();
}
};
IndexingOperationListener throwingListener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
throw new RuntimeException();
}
@Override
public void postIndex(Engine.Index index) {
throw new RuntimeException(); }
@Override
public void postIndex(Engine.Index index, Throwable ex) {
throw new RuntimeException(); }
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
throw new RuntimeException();
}
@Override
public void postDelete(Engine.Delete delete) {
throw new RuntimeException(); }
@Override
public void postDelete(Engine.Delete delete, Throwable ex) {
throw new RuntimeException();
}
};
final List<IndexingOperationListener> indexingOperationListeners = new ArrayList<>(Arrays.asList(listener, listener));
if (randomBoolean()) {
indexingOperationListeners.add(throwingListener);
if (randomBoolean()) {
indexingOperationListeners.add(throwingListener);
}
}
Collections.shuffle(indexingOperationListeners, random());
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
compositeListener.postDelete(delete);
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
assertEquals(0, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(0, postDeleteException.get());
compositeListener.postDelete(delete, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
assertEquals(0, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preDelete(delete);
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index);
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(0, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preIndex(index);
assertEquals(2, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
}
}