Load percolator queries before shard is marked POST_RECOVERY

If we mark the shard as being in POST_RECOVERY before the percolator
is fully set up we might expose it to the user as fully searchable before
all queries are loaded. This can lead to wrong results especially in tests
when a shard is concurrently marked as STARTED.

This commit also removes unneded abstractions on IndexShard where readoperations
should be allowed when the purose is a write.
This commit is contained in:
Simon Willnauer 2015-06-22 13:10:42 +02:00
parent 55c55677a3
commit 1f3670733a
5 changed files with 29 additions and 24 deletions

View File

@ -133,7 +133,6 @@ public class TransportFieldStatsTransportAction extends TransportBroadcastAction
IndexService indexServices = indicesService.indexServiceSafe(shardId.getIndex());
MapperService mapperService = indexServices.mapperService();
IndexShard shard = indexServices.shardSafe(shardId.id());
shard.readAllowed();
try (Engine.Searcher searcher = shard.acquireSearcher("fieldstats")) {
for (String field : request.getFields()) {
MappedFieldType fieldType = mapperService.fullName(field);

View File

@ -257,7 +257,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
}
@Override
public void afterIndexShardPostRecovery(IndexShard indexShard) {
public void beforeIndexShardPostRecovery(IndexShard indexShard) {
if (hasPercolatorType(indexShard)) {
// percolator index has started, fetch what we can from it and initialize the indices
// we have
@ -274,8 +274,9 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
private int loadQueries(IndexShard shard) {
shard.refresh("percolator_load_queries");
// Maybe add a mode load? This isn't really a write. We need write b/c state=post_recovery
try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries", true)) {
// NOTE: we acquire the searcher via the engine directly here since this is executed right
// before the shard is marked as POST_RECOVERY
try (Engine.Searcher searcher = shard.engine().acquireSearcher("percolator_load_queries")) {
Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME));
QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService);
searcher.searcher().search(query, queryCollector);

View File

@ -742,11 +742,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public Engine.Searcher acquireSearcher(String source) {
return acquireSearcher(source, false);
}
public Engine.Searcher acquireSearcher(String source, boolean searcherForWriteOperation) {
readAllowed(searcherForWriteOperation);
readAllowed();
return engine().acquireSearcher(source);
}
@ -776,6 +772,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
indicesLifecycle.beforeIndexShardPostRecovery(this);
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
@ -907,20 +904,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public void readAllowed() throws IllegalIndexShardStateException {
readAllowed(false);
}
private void readAllowed(boolean writeOperation) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (writeOperation) {
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
}
} else {
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
}
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
}
}

View File

@ -97,9 +97,16 @@ public interface IndicesLifecycle {
}
public void afterIndexShardPostRecovery(IndexShard indexShard) {
/**
* Called right after the shard is moved into POST_RECOVERY mode
*/
public void afterIndexShardPostRecovery(IndexShard indexShard) {}
}
/**
* Called right before the shard is moved into POST_RECOVERY mode.
* The shard is ready to be used but not yet marked as POST_RECOVERY.
*/
public void beforeIndexShardPostRecovery(IndexShard indexShard) {}
/**
* Called after the index shard has been started.

View File

@ -121,6 +121,18 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
public void beforeIndexShardPostRecovery(IndexShard indexShard) {
for (Listener listener : listeners) {
try {
listener.beforeIndexShardPostRecovery(indexShard);
} catch (Throwable t) {
logger.warn("{} failed to invoke before shard post recovery callback", t, indexShard.shardId());
throw t;
}
}
}
public void afterIndexShardPostRecovery(IndexShard indexShard) {
for (Listener listener : listeners) {
try {