diff --git a/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java index 6d3d3396984..c72aad3f718 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java @@ -36,7 +36,7 @@ import java.io.IOException; */ public class RefreshRequest extends BroadcastOperationRequest { - private boolean waitForOperations = true; + private boolean force = true; RefreshRequest() { } @@ -45,22 +45,26 @@ public class RefreshRequest extends BroadcastOperationRequest { super(indices); } - public boolean waitForOperations() { - return waitForOperations; + public boolean force() { + return force; } - public RefreshRequest waitForOperations(boolean waitForOperations) { - this.waitForOperations = waitForOperations; + /** + * Forces calling refresh, overriding the check that dirty operations even happened. Defaults + * to true (note, still lightweight if no refresh is needed). + */ + public RefreshRequest force(boolean force) { + this.force = force; return this; } public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - waitForOperations = in.readBoolean(); + force = in.readBoolean(); } public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(waitForOperations); + out.writeBoolean(force); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequestBuilder.java index 3f42db9f08d..b8d8ed46e9d 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequestBuilder.java @@ -35,8 +35,12 @@ public class RefreshRequestBuilder extends BroadcastOperationRequestBuilder { - private final IndicesService indicesService; + public static boolean REFRESH_FORCE = false; + private final IndicesService indicesService; private final boolean realtime; @Inject @@ -96,7 +97,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction entry : versionMap.entrySet()) { HashedBytesRef uid = entry.getKey(); synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? @@ -992,7 +992,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { flush(new Flush().force(true)); } if (optimize.refresh()) { - refresh(new Refresh(false).force(true)); + refresh(new Refresh().force(true)); } } diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 89f72dbcc4e..11d93f84a41 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -183,7 +183,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem indexShard.start("post recovery from gateway"); } // refresh the shard - indexShard.refresh(new Engine.Refresh(false)); + indexShard.refresh(new Engine.Refresh().force(true)); recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime()); recoveryStatus.updateStage(RecoveryStatus.Stage.DONE); diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index 5a1095e7c16..61dccba4e42 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -226,7 +226,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent { private void loadQueries(IndexShard shard) { try { - shard.refresh(new Engine.Refresh(true)); + shard.refresh(new Engine.Refresh().force(true)); Engine.Searcher searcher = shard.searcher(); try { Query query = new XConstantScoreQuery( diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 7fb224a6146..31594797cb7 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -29,6 +29,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -238,22 +239,31 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this.shardRouting; } - public InternalIndexShard routingEntry(ShardRouting shardRouting) { + public InternalIndexShard routingEntry(ShardRouting newRouting) { ShardRouting currentRouting = this.shardRouting; - if (!shardRouting.shardId().equals(shardId())) { - throw new ElasticSearchIllegalArgumentException("Trying to set a routing entry with shardId [" + shardRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); + if (!newRouting.shardId().equals(shardId())) { + throw new ElasticSearchIllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); } if (currentRouting != null) { - if (!shardRouting.primary() && currentRouting.primary()) { + if (!newRouting.primary() && currentRouting.primary()) { logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode"); } // if its the same routing, return - if (currentRouting.equals(shardRouting)) { + if (currentRouting.equals(newRouting)) { return this; } } - this.shardRouting = shardRouting; - indicesLifecycle.shardRoutingChanged(this, currentRouting, shardRouting); + + // make sure we refresh on state change due to cluster state changes + if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) { + try { + engine.refresh(new Engine.Refresh().force(true)); + } catch (Throwable t) { + logger.debug("failed to refresh due to move to cluster wide started", t); + } + } + this.shardRouting = newRouting; + indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting); return this; } @@ -642,7 +652,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } // clear unreferenced files translog.clearUnreferenced(); - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(true)); synchronized (mutex) { logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; @@ -805,7 +815,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I public void run() { try { if (engine.refreshNeeded()) { - refresh(new Engine.Refresh(false)); + refresh(new Engine.Refresh().force(false)); } } catch (EngineClosedException e) { // we are being closed, ignore diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/refresh/RestRefreshAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/refresh/RestRefreshAction.java index 94cb39e6723..c5a84fbb75c 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/refresh/RestRefreshAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/refresh/RestRefreshAction.java @@ -58,6 +58,7 @@ public class RestRefreshAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { RefreshRequest refreshRequest = new RefreshRequest(RestActions.splitIndices(request.param("index"))); refreshRequest.listenerThreaded(false); + refreshRequest.force(request.paramAsBoolean("force", refreshRequest.force())); if (request.hasParam("ignore_indices")) { refreshRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices"))); } diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/RobinEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/RobinEngineTests.java index 4a0851dcb60..aede1c389fc 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/RobinEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/RobinEngineTests.java @@ -100,9 +100,9 @@ public class RobinEngineTests extends ElasticsearchTestCase { private IndexSettingsService engineSettingsService; private IndexSettingsService replicaSettingsService; - + private Settings defaultSettings; - + @Before public void setUp() throws Exception { @@ -194,6 +194,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { return new RobinEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index())); } + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); @@ -210,7 +211,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); segments = engine.segments(); assertThat(segments.size(), equalTo(1)); @@ -229,12 +230,12 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(segments.get(0).getNumDocs(), equalTo(2)); assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); - + engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, false).build()); ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); segments = engine.segments(); assertThat(segments.size(), equalTo(2)); @@ -254,7 +255,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { engine.delete(new Engine.Delete("test", "1", newUid("1"))); - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); segments = engine.segments(); assertThat(segments.size(), equalTo(2)); @@ -270,12 +271,12 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(segments.get(1).getNumDocs(), equalTo(1)); assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); assertThat(segments.get(1).isCompound(), equalTo(false)); - - engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build()); + + engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build()); ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false); engine.create(new Engine.Create(null, newUid("4"), doc4)); - engine.refresh(new Engine.Refresh(true)); - + engine.refresh(new Engine.Refresh().force(false)); + segments = engine.segments(); assertThat(segments.size(), equalTo(3)); assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true)); @@ -290,14 +291,14 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(segments.get(1).getNumDocs(), equalTo(1)); assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); assertThat(segments.get(1).isCompound(), equalTo(false)); - + assertThat(segments.get(2).isCommitted(), equalTo(false)); assertThat(segments.get(2).isSearch(), equalTo(true)); assertThat(segments.get(2).getNumDocs(), equalTo(1)); assertThat(segments.get(2).getDeletedDocs(), equalTo(0)); assertThat(segments.get(2).isCompound(), equalTo(true)); } - + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.searcher(); @@ -327,7 +328,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(getResult.exists(), equalTo(false)); // refresh and it should be there - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); // now its there... searchResult = engine.searcher(); @@ -361,7 +362,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(getResult.docIdAndVersion(), nullValue()); // refresh and it should be updated - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); searchResult = engine.searcher(); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); @@ -384,7 +385,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { assertThat(getResult.exists(), equalTo(false)); // refresh and it should be deleted - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); searchResult = engine.searcher(); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); @@ -406,7 +407,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { searchResult.release(); // refresh and it should be there - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); // now its there... searchResult = engine.searcher(); @@ -440,7 +441,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { searchResult.release(); // refresh and it should be updated - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); searchResult = engine.searcher(); MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1)); @@ -468,7 +469,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { searchResult.release(); // refresh and it should be there - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); // now its there... searchResult = engine.searcher(); @@ -478,7 +479,7 @@ public class RobinEngineTests extends ElasticsearchTestCase { // delete, refresh and do a new search, it should not be there engine.delete(new Engine.Delete("test", "1", newUid("1"))); - engine.refresh(new Engine.Refresh(true)); + engine.refresh(new Engine.Refresh().force(false)); Engine.Searcher updateSearchResult = engine.searcher(); MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); updateSearchResult.release();