Hide engine entirely in IndexShard and do searcher wrapping only on top of the engine

This commit is contained in:
Simon Willnauer 2015-10-01 20:00:10 +02:00
parent d2e3e8cc7b
commit a892a35f40
19 changed files with 177 additions and 142 deletions

View File

@ -95,6 +95,6 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) { protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardRouting.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRouting.id()); IndexShard indexShard = indexService.shardSafe(shardRouting.id());
return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose())); return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose()));
} }
} }

View File

@ -97,7 +97,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction
protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) { protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id());
List<Segment> segments = indexShard.engine().segments(false); List<Segment> segments = indexShard.segments(false);
long total_bytes = 0; long total_bytes = 0;
long to_upgrade_bytes = 0; long to_upgrade_bytes = 0;
long to_upgrade_bytes_ancient = 0; long to_upgrade_bytes_ancient = 0;

View File

@ -20,8 +20,10 @@
package org.elasticsearch.index; package org.elasticsearch.index;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
/** /**
* *
@ -30,11 +32,19 @@ public class IndexModule extends AbstractModule {
// pkg private so tests can mock // pkg private so tests can mock
Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class; Class<? extends EngineFactory> engineFactoryImpl = InternalEngineFactory.class;
Class<? extends IndexSearcherWrapper> indexSearcherWrapper = null;
@Override @Override
protected void configure() { protected void configure() {
bind(EngineFactory.class).to(engineFactoryImpl); bind(EngineFactory.class).to(engineFactoryImpl).asEagerSingleton();
if (indexSearcherWrapper == null) {
bind(IndexSearcherWrapper.class).toProvider(Providers.of(null));
} else {
bind(IndexSearcherWrapper.class).to(indexSearcherWrapper).asEagerSingleton();
}
bind(IndexService.class).asEagerSingleton(); bind(IndexService.class).asEagerSingleton();
bind(IndexServicesProvider.class).asEagerSingleton(); bind(IndexServicesProvider.class).asEagerSingleton();
} }
} }

View File

@ -390,23 +390,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
} }
} }
/**
* Closes an optional resource. Returns true if the resource was found;
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private boolean closeInjectorOptionalResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable> toClose) {
try {
final Closeable instance = shardInjector.getInstance(toClose);
if (instance == null) {
return false;
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(toClose.getSimpleName()));
}
return true;
}
private void onShardClose(ShardLock lock, boolean ownsShard) { private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted if (deleted.get()) { // we remove that shards content if this index has been deleted

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.index; package org.elasticsearch.index;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
@ -27,6 +28,7 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.termvectors.TermVectorsService; import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
@ -56,8 +58,12 @@ public final class IndexServicesProvider {
private final EngineFactory factory; private final EngineFactory factory;
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final IndexSearcherWrapper indexSearcherWrapper;
@Inject @Inject
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays) { public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper) {
this.indicesLifecycle = indicesLifecycle; this.indicesLifecycle = indicesLifecycle;
this.threadPool = threadPool; this.threadPool = threadPool;
this.mapperService = mapperService; this.mapperService = mapperService;
@ -72,6 +78,7 @@ public final class IndexServicesProvider {
this.similarityService = similarityService; this.similarityService = similarityService;
this.factory = factory; this.factory = factory;
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.indexSearcherWrapper = indexSearcherWrapper;
} }
public IndicesLifecycle getIndicesLifecycle() { public IndicesLifecycle getIndicesLifecycle() {
@ -129,4 +136,6 @@ public final class IndexServicesProvider {
public BigArrays getBigArrays() { public BigArrays getBigArrays() {
return bigArrays; return bigArrays;
} }
public IndexSearcherWrapper getIndexSearcherWrapper() { return indexSearcherWrapper; }
} }

View File

@ -79,8 +79,6 @@ public abstract class Engine implements Closeable {
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
private final IndexSearcherWrapper searcherWrapper;
protected volatile Throwable failedEngine = null; protected volatile Throwable failedEngine = null;
protected Engine(EngineConfig engineConfig) { protected Engine(EngineConfig engineConfig) {
@ -94,7 +92,6 @@ public abstract class Engine implements Closeable {
engineConfig.getIndexSettings(), engineConfig.getShardId()); engineConfig.getIndexSettings(), engineConfig.getShardId());
this.failedEngineListener = engineConfig.getFailedEngineListener(); this.failedEngineListener = engineConfig.getFailedEngineListener();
this.deletionPolicy = engineConfig.getDeletionPolicy(); this.deletionPolicy = engineConfig.getDeletionPolicy();
this.searcherWrapper = engineConfig.getSearcherWrapper();
} }
/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */ /** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
@ -282,7 +279,7 @@ public abstract class Engine implements Closeable {
try { try {
final Searcher retVal = newSearcher(source, searcher, manager); final Searcher retVal = newSearcher(source, searcher, manager);
success = true; success = true;
return wrap(engineConfig, retVal); return retVal;
} finally { } finally {
if (!success) { if (!success) {
manager.release(searcher); manager.release(searcher);
@ -301,38 +298,6 @@ public abstract class Engine implements Closeable {
} }
} }
/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
* gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned.
*
* This is invoked each time a {@link Searcher} is requested to do an operation. (for example search)
*/
private Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException {
if (searcherWrapper == null) {
return engineSearcher;
}
DirectoryReader reader = searcherWrapper.wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
IndexSearcher indexSearcher = searcherWrapper.wrap(engineConfig, innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
}
};
}
}
/** returns the translog for this engine */ /** returns the translog for this engine */
public abstract Translog getTranslog(); public abstract Translog getTranslog();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@ -380,14 +381,6 @@ public final class EngineConfig {
return queryCachingPolicy; return queryCachingPolicy;
} }
IndexSearcherWrapper getSearcherWrapper() {
return searcherWrapper.get();
}
public void setSearcherWrapper(IndexSearcherWrapper searcherWrapper) {
this.searcherWrapper.set(searcherWrapper);
}
/** /**
* Returns the translog config for this engine * Returns the translog config for this engine
*/ */

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
/**
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
* and {@link IndexSearcher} managed by the {@link Engine}.
*/
interface IndexSearcherWrapper {
/**
* @param reader The provided directory reader to be wrapped to add custom functionality
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
* the provided directory reader
*/
DirectoryReader wrap(DirectoryReader reader);
/**
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
* when creating a new index searcher
* @param searcher The provided index searcher to be wrapped to add custom functionality
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
* the provided index searcher
*/
IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException;
}

View File

@ -257,7 +257,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
shard.refresh("percolator_load_queries"); shard.refresh("percolator_load_queries");
// NOTE: we acquire the searcher via the engine directly here since this is executed right // NOTE: we acquire the searcher via the engine directly here since this is executed right
// before the shard is marked as POST_RECOVERY // before the shard is marked as POST_RECOVERY
try (Engine.Searcher searcher = shard.engine().acquireSearcher("percolator_load_queries")) { try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries")) {
Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME)); Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService); QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
IndexSearcher indexSearcher = new IndexSearcher(searcher.reader()); IndexSearcher indexSearcher = new IndexSearcher(searcher.reader());

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
/**
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
* and {@link IndexSearcher} managed by the {@link Engine}.
*/
public interface IndexSearcherWrapper {
/**
* @param reader The provided directory reader to be wrapped to add custom functionality
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
* the provided directory reader
*/
DirectoryReader wrap(DirectoryReader reader);
/**
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
* when creating a new index searcher
* @param searcher The provided index searcher to be wrapped to add custom functionality
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
* the provided index searcher
*/
IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException;
/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
* gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
*
* This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
*/
default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) {
DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
}
};
}
}
}

View File

@ -20,10 +20,8 @@
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
@ -188,7 +186,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final IndexShardOperationCounter indexShardOperationCounter; private final IndexShardOperationCounter indexShardOperationCounter;
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); private final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
private final IndexSearcherWrapper searcherWrapper;
@Inject @Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) { public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
@ -244,6 +244,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)); this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false); this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.searcherWrapper = provider.getIndexSearcherWrapper();
} }
public Store store() { public Store store() {
@ -739,7 +740,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public Engine.Searcher acquireSearcher(String source) { public Engine.Searcher acquireSearcher(String source) {
readAllowed(); readAllowed();
return engine().acquireSearcher(source); Engine engine = engine();
return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
} }
public void close(String reason, boolean flushEngine) throws IOException { public void close(String reason, boolean flushEngine) throws IOException {
@ -1167,6 +1169,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
} }
} }
public Translog.View acquireTranslogView() {
Engine engine = engine();
assert engine.getTranslog() != null : "translog must not be null";
return engine.getTranslog().newView();
}
public List<Segment> segments(boolean verbose) {
return engine().segments(verbose);
}
public void flushAndCloseEngine() throws IOException {
engine().flushAndClose();
}
public Translog getTranslog() {
return engine().getTranslog();
}
class EngineRefresher implements Runnable { class EngineRefresher implements Runnable {
@Override @Override
public void run() { public void run() {
@ -1292,7 +1312,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS))); recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
} }
public Engine engine() { Engine engine() {
Engine engine = engineUnsafe(); Engine engine = engineUnsafe();
if (engine == null) { if (engine == null) {
throw new EngineClosedException(shardId); throw new EngineClosedException(shardId);
@ -1507,4 +1527,5 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
} }
return false; return false;
} }
} }

View File

@ -264,7 +264,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} }
final Translog translog; final Translog translog;
try { try {
translog = indexShard.engine().getTranslog(); translog = indexShard.getTranslog();
} catch (EngineClosedException e) { } catch (EngineClosedException e) {
// not ready yet to be checked for activity // not ready yet to be checked for activity
return null; return null;

View File

@ -120,9 +120,7 @@ public class RecoverySourceHandler {
* performs the recovery from the local engine to the target * performs the recovery from the local engine to the target
*/ */
public RecoveryResponse recoverToTarget() { public RecoveryResponse recoverToTarget() {
final Engine engine = shard.engine(); try (Translog.View translogView = shard.acquireTranslogView()) {
assert engine.getTranslog() != null : "translog must not be null";
try (Translog.View translogView = engine.getTranslog().newView()) {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot; final IndexCommit phase1Snapshot;
try { try {
@ -179,7 +177,7 @@ public class RecoverySourceHandler {
try { try {
recoverySourceMetadata = store.getMetadata(snapshot); recoverySourceMetadata = store.getMetadata(snapshot);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
shard.engine().failEngine("recovery", ex); shard.failShard("recovery", ex);
throw ex; throw ex;
} }
for (String name : snapshot.getFileNames()) { for (String name : snapshot.getFileNames()) {
@ -287,7 +285,7 @@ public class RecoverySourceHandler {
for (StoreFileMetaData md : metadata) { for (StoreFileMetaData md : metadata) {
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
shard.engine().failEngine("recovery", corruptIndexException); shard.failShard("recovery", corruptIndexException);
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
throw corruptIndexException; throw corruptIndexException;
} }
@ -641,7 +639,7 @@ public class RecoverySourceHandler {
} }
protected void failEngine(IOException cause) { protected void failEngine(IOException cause) {
shard.engine().failEngine("recovery", cause); shard.failShard("recovery", cause);
} }
Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) { Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) {

View File

@ -52,7 +52,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
// if we relocate we need to close the engine in order to open a new // if we relocate we need to close the engine in order to open a new
// IndexWriter on the other end of the relocation // IndexWriter on the other end of the relocation
engineClosed = true; engineClosed = true;
shard.engine().flushAndClose(); shard.flushAndCloseEngine();
} catch (IOException e) { } catch (IOException e) {
logger.warn("close engine failed", e); logger.warn("close engine failed", e);
shard.failShard("failed to close engine (phase1)", e); shard.failShard("failed to close engine (phase1)", e);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LiveIndexWriterConfig;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.EngineAccess;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,7 +34,7 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
public void testSettingsUpdate() { public void testSettingsUpdate() {
final IndexService service = createIndex("foo"); final IndexService service = createIndex("foo");
// INDEX_COMPOUND_ON_FLUSH // INDEX_COMPOUND_ON_FLUSH
InternalEngine engine = ((InternalEngine)engine(service)); InternalEngine engine = ((InternalEngine) EngineAccess.engine(service.shard(0)));
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true)); assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
client().admin().indices().prepareUpdateSettings("foo").setSettings(Settings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build()).get(); client().admin().indices().prepareUpdateSettings("foo").setSettings(Settings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(false)); assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(false));

View File

@ -67,10 +67,7 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.similarity.SimilarityLookupService; import org.elasticsearch.index.similarity.SimilarityLookupService;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.DirectoryUtils;
@ -491,8 +488,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))) assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY)));
;
} }
@Test @Test
@ -516,9 +512,9 @@ public class InternalEngineTests extends ESTestCase {
Path translog = createTempDir("translog-test"); Path translog = createTempDir("translog-test");
InternalEngine engine = createEngine(store, translog); InternalEngine engine = createEngine(store, translog);
engine.close(); engine.close();
engine.config().setSearcherWrapper(wrapper);
engine = new InternalEngine(engine.config(), false); engine = new InternalEngine(engine.config(), false);
Engine.Searcher searcher = engine.acquireSearcher("test"); Engine.Searcher searcher = wrapper.wrap(engine.config(), engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2)); assertThat(counter.get(), equalTo(2));
searcher.close(); searcher.close();
IOUtils.close(store, engine); IOUtils.close(store, engine);

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.index.engine.Engine;
/**
* Test utility to access the engine of a shard
*/
public final class EngineAccess {
public static Engine engine(IndexShard shard) {
return shard.engine();
}
}

View File

@ -215,10 +215,6 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
return instanceFromNode.indexServiceSafe(index); return instanceFromNode.indexServiceSafe(index);
} }
protected static org.elasticsearch.index.engine.Engine engine(IndexService service) {
return service.shard(0).engine();
}
/** /**
* Create a new search context. * Create a new search context.
*/ */

View File

@ -1048,7 +1048,7 @@ public final class InternalTestCluster extends TestCluster {
for (IndexService indexService : indexServices) { for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
try { try {
CommitStats commitStats = indexShard.engine().commitStats(); CommitStats commitStats = indexShard.commitStats();
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) { if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs(); long liveDocsOnShard = commitStats.getNumDocs();