In the delete by query also acquire a searcher when shard state is RECOVERING and POST_RECOVERING
This commit is contained in:
parent
f2d75654bf
commit
ec6539df37
|
@ -131,7 +131,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
|
||||||
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
|
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
|
||||||
|
|
||||||
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
|
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
|
||||||
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler));
|
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler));
|
||||||
try {
|
try {
|
||||||
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types())
|
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types())
|
||||||
.origin(Engine.Operation.Origin.REPLICA);
|
.origin(Engine.Operation.Origin.REPLICA);
|
||||||
|
|
|
@ -147,8 +147,15 @@ public interface IndexShard extends IndexShardComponent {
|
||||||
|
|
||||||
Engine.Searcher acquireSearcher(String source);
|
Engine.Searcher acquireSearcher(String source);
|
||||||
|
|
||||||
|
Engine.Searcher acquireSearcher(String source, Mode mode);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns <tt>true</tt> if this shard can ignore a recovery attempt made to it (since the already doing/done it)
|
* Returns <tt>true</tt> if this shard can ignore a recovery attempt made to it (since the already doing/done it)
|
||||||
*/
|
*/
|
||||||
public boolean ignoreRecoveryAttempt();
|
public boolean ignoreRecoveryAttempt();
|
||||||
|
|
||||||
|
public enum Mode {
|
||||||
|
READ,
|
||||||
|
WRITE
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -600,7 +600,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Engine.Searcher acquireSearcher(String source) {
|
public Engine.Searcher acquireSearcher(String source) {
|
||||||
readAllowed();
|
return acquireSearcher(source, Mode.READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Engine.Searcher acquireSearcher(String source, Mode mode) {
|
||||||
|
readAllowed(mode);
|
||||||
return engine.acquireSearcher(source);
|
return engine.acquireSearcher(source);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -756,9 +761,23 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readAllowed() throws IllegalIndexShardStateException {
|
public void readAllowed() throws IllegalIndexShardStateException {
|
||||||
|
readAllowed(Mode.READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void readAllowed(Mode mode) throws IllegalIndexShardStateException {
|
||||||
IndexShardState state = this.state; // one time volatile read
|
IndexShardState state = this.state; // one time volatile read
|
||||||
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
|
switch (mode) {
|
||||||
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
|
case READ:
|
||||||
|
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
|
||||||
|
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case WRITE:
|
||||||
|
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
|
||||||
|
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.deleteByQuery;
|
package org.elasticsearch.deleteByQuery;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
|
||||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
|
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
|
||||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
|
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
@ -35,7 +34,6 @@ import static org.elasticsearch.test.hamcrest.ElasticSearchAssertions.assertHitC
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
@LuceneTestCase.AwaitsFix(bugUrl = "Martijn is working on a fix for a failure here caused by a shard reject the delete because it's in a POST_RECOVERY or RECOVERY state, because we now acquire a searcher during a delete by query shard operation.")
|
|
||||||
public class DeleteByQueryTests extends AbstractIntegrationTest {
|
public class DeleteByQueryTests extends AbstractIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue