Improve refresh logic when replica move to started

closes #3573
This commit is contained in:
Shay Banon 2013-08-26 15:15:01 +02:00
parent b329943632
commit 8b295b53d0
17 changed files with 85 additions and 75 deletions

View File

@ -36,7 +36,7 @@ import java.io.IOException;
*/
public class RefreshRequest extends BroadcastOperationRequest<RefreshRequest> {
private boolean waitForOperations = true;
private boolean force = true;
RefreshRequest() {
}
@ -45,22 +45,26 @@ public class RefreshRequest extends BroadcastOperationRequest<RefreshRequest> {
super(indices);
}
public boolean waitForOperations() {
return waitForOperations;
public boolean force() {
return force;
}
public RefreshRequest waitForOperations(boolean waitForOperations) {
this.waitForOperations = waitForOperations;
/**
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
* to true (note, still lightweight if no refresh is needed).
*/
public RefreshRequest force(boolean force) {
this.force = force;
return this;
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForOperations = in.readBoolean();
force = in.readBoolean();
}
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForOperations);
out.writeBoolean(force);
}
}

View File

@ -35,8 +35,12 @@ public class RefreshRequestBuilder extends BroadcastOperationRequestBuilder<Refr
super((InternalIndicesAdminClient) indicesClient, new RefreshRequest());
}
public RefreshRequestBuilder setWaitForOperations(boolean waitForOperations) {
request.waitForOperations(waitForOperations);
/**
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
* to true (note, still lightweight if no refresh is needed).
*/
public RefreshRequestBuilder setForce(boolean force) {
request.force(force);
return this;
}

View File

@ -30,34 +30,29 @@ import java.io.IOException;
*/
class ShardRefreshRequest extends BroadcastShardOperationRequest {
private boolean waitForOperations = true;
private boolean force = true;
ShardRefreshRequest() {
}
public ShardRefreshRequest(String index, int shardId, RefreshRequest request) {
super(index, shardId, request);
waitForOperations = request.waitForOperations();
force = request.force();
}
public boolean waitForOperations() {
return waitForOperations;
}
public ShardRefreshRequest waitForOperations(boolean waitForOperations) {
this.waitForOperations = waitForOperations;
return this;
public boolean force() {
return force;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForOperations = in.readBoolean();
force = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForOperations);
out.writeBoolean(force);
}
}

View File

@ -112,7 +112,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
@Override
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.refresh(new Engine.Refresh(request.waitForOperations()));
indexShard.refresh(new Engine.Refresh().force(request.force()));
return new ShardRefreshResponse(request.index(), request.shardId());
}

View File

@ -338,7 +338,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Throwable e) {
// ignore
}
@ -553,7 +553,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Throwable e) {
// ignore
}

View File

@ -187,7 +187,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}
@ -208,7 +208,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}

View File

@ -100,7 +100,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}
@ -121,7 +121,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}

View File

@ -41,8 +41,9 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportGetAction extends TransportShardSingleOperationAction<GetRequest, GetResponse> {
private final IndicesService indicesService;
public static boolean REFRESH_FORCE = false;
private final IndicesService indicesService;
private final boolean realtime;
@Inject
@ -96,7 +97,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
IndexShard indexShard = indexService.shardSafe(shardId);
if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(REFRESH_FORCE));
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),

View File

@ -105,7 +105,7 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
IndexShard indexShard = indexService.shardSafe(shardId);
if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(TransportGetAction.REFRESH_FORCE));
}
MultiGetShardResponse response = new MultiGetShardResponse();

View File

@ -218,7 +218,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}
@ -252,7 +252,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
// ignore
}

View File

@ -185,14 +185,12 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Refresh {
private final boolean waitForOperations;
private boolean force = false;
public Refresh(boolean waitForOperations) {
this.waitForOperations = waitForOperations;
}
/**
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
* to true (note, still lightweight if no refresh is needed).
*/
public Refresh force(boolean force) {
this.force = force;
return this;
@ -202,13 +200,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.force;
}
public boolean waitForOperations() {
return waitForOperations;
}
@Override
public String toString() {
return "waitForOperations[" + waitForOperations + "]";
return "force[" + force + "]";
}
}

View File

@ -901,7 +901,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void refreshVersioningTable(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh(true).force(true));
refresh(new Refresh().force(true));
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
HashedBytesRef uid = entry.getKey();
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
@ -992,7 +992,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
flush(new Flush().force(true));
}
if (optimize.refresh()) {
refresh(new Refresh(false).force(true));
refresh(new Refresh().force(true));
}
}

View File

@ -183,7 +183,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
indexShard.start("post recovery from gateway");
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
indexShard.refresh(new Engine.Refresh().force(true));
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);

View File

@ -226,7 +226,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
private void loadQueries(IndexShard shard) {
try {
shard.refresh(new Engine.Refresh(true));
shard.refresh(new Engine.Refresh().force(true));
Engine.Searcher searcher = shard.searcher();
try {
Query query = new XConstantScoreQuery(

View File

@ -29,6 +29,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -238,22 +239,31 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.shardRouting;
}
public InternalIndexShard routingEntry(ShardRouting shardRouting) {
public InternalIndexShard routingEntry(ShardRouting newRouting) {
ShardRouting currentRouting = this.shardRouting;
if (!shardRouting.shardId().equals(shardId())) {
throw new ElasticSearchIllegalArgumentException("Trying to set a routing entry with shardId [" + shardRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
if (!newRouting.shardId().equals(shardId())) {
throw new ElasticSearchIllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
}
if (currentRouting != null) {
if (!shardRouting.primary() && currentRouting.primary()) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(shardRouting)) {
if (currentRouting.equals(newRouting)) {
return this;
}
}
this.shardRouting = shardRouting;
indicesLifecycle.shardRoutingChanged(this, currentRouting, shardRouting);
// make sure we refresh on state change due to cluster state changes
if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) {
try {
engine.refresh(new Engine.Refresh().force(true));
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
}
this.shardRouting = newRouting;
indicesLifecycle.shardRoutingChanged(this, currentRouting, newRouting);
return this;
}
@ -642,7 +652,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
// clear unreferenced files
translog.clearUnreferenced();
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(true));
synchronized (mutex) {
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
@ -805,7 +815,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void run() {
try {
if (engine.refreshNeeded()) {
refresh(new Engine.Refresh(false));
refresh(new Engine.Refresh().force(false));
}
} catch (EngineClosedException e) {
// we are being closed, ignore

View File

@ -58,6 +58,7 @@ public class RestRefreshAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
RefreshRequest refreshRequest = new RefreshRequest(RestActions.splitIndices(request.param("index")));
refreshRequest.listenerThreaded(false);
refreshRequest.force(request.paramAsBoolean("force", refreshRequest.force()));
if (request.hasParam("ignore_indices")) {
refreshRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));
}

View File

@ -100,9 +100,9 @@ public class RobinEngineTests extends ElasticsearchTestCase {
private IndexSettingsService engineSettingsService;
private IndexSettingsService replicaSettingsService;
private Settings defaultSettings;
@Before
public void setUp() throws Exception {
@ -194,6 +194,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
return new RobinEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
}
protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
@ -210,7 +211,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
segments = engine.segments();
assertThat(segments.size(), equalTo(1));
@ -229,12 +230,12 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, false).build());
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
@ -254,7 +255,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
@ -270,12 +271,12 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
segments = engine.segments();
assertThat(segments.size(), equalTo(3));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
@ -290,14 +291,14 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
assertThat(segments.get(2).isCommitted(), equalTo(false));
assertThat(segments.get(2).isSearch(), equalTo(true));
assertThat(segments.get(2).getNumDocs(), equalTo(1));
assertThat(segments.get(2).getDeletedDocs(), equalTo(0));
assertThat(segments.get(2).isCompound(), equalTo(true));
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.searcher();
@ -327,7 +328,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(getResult.exists(), equalTo(false));
// refresh and it should be there
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
// now its there...
searchResult = engine.searcher();
@ -361,7 +362,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(getResult.docIdAndVersion(), nullValue());
// refresh and it should be updated
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
searchResult = engine.searcher();
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
@ -384,7 +385,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
assertThat(getResult.exists(), equalTo(false));
// refresh and it should be deleted
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
searchResult = engine.searcher();
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
@ -406,7 +407,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
searchResult.release();
// refresh and it should be there
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
// now its there...
searchResult = engine.searcher();
@ -440,7 +441,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
searchResult.release();
// refresh and it should be updated
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
searchResult = engine.searcher();
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
@ -468,7 +469,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
searchResult.release();
// refresh and it should be there
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
// now its there...
searchResult = engine.searcher();
@ -478,7 +479,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
// delete, refresh and do a new search, it should not be there
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh(true));
engine.refresh(new Engine.Refresh().force(false));
Engine.Searcher updateSearchResult = engine.searcher();
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
updateSearchResult.release();