[CORE] Rename START phase into VERIFY_INDEX

we really only optionally run checkindex in this phase and moving
the engine start into translog is move cleaner code wise.

Closes #10570
This commit is contained in:
Simon Willnauer 2015-04-13 16:57:26 +02:00
parent fe411a9295
commit b756477fb4
7 changed files with 51 additions and 76 deletions

View File

@ -35,8 +35,8 @@
- gte: { test_1.shards.0.translog.total: -1 }
- gte: { test_1.shards.0.translog.total_on_start: 0 }
- gte: { test_1.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_1.shards.0.start.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.start.total_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test index name not matching":

View File

@ -149,11 +149,8 @@ public class InternalEngine extends Engine {
long nextTranslogID = translogId.v2();
translog.newTranslog(nextTranslogID);
translogIdGenerator.set(nextTranslogID);
if (skipInitialTranslogRecovery == false) {
transformer.beginTranslogRecovery();
if (translogId.v1() != null) {
if (translogId.v1() != null && skipInitialTranslogRecovery == false) {
recoverFromTranslog(translogId.v1(), transformer);
}
} else {
flush(true, true);
}

View File

@ -130,8 +130,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());

View File

@ -92,7 +92,6 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
@ -811,11 +810,13 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.setStage(RecoveryState.Stage.START);
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
checkIndex();
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
final EngineConfig config = newEngineConfig();
@ -834,8 +835,6 @@ public class IndexShard extends AbstractIndexShardComponent {
Set<String> recoveredTypes = internalPerformTranslogRecovery(true);
assert recoveredTypes.isEmpty();
assert recoveryState.getTranslog().recoveredOperations() == 0;
assert recoveryState.getStage() == RecoveryState.Stage.START : "START stage expected but was: " + recoveryState.getStage();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
}
/** called if recovery has to be restarted after network error / delay ** */
@ -1178,7 +1177,7 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.debug("check index [success]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
}
recoveryState.getStart().checkIndexTime(Math.max(0, System.currentTimeMillis() - time));
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, System.currentTimeMillis() - time));
}
public Engine engine() {
@ -1281,18 +1280,12 @@ public class IndexShard extends AbstractIndexShardComponent {
}
protected EngineConfig newEngineConfig() {
TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
@Override
protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}
@Override
public void beginTranslogRecovery() {
assert recoveryState != null;
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
}
};
return new EngineConfig(shardId,
indexSettingsService.getSettings().getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),

View File

@ -18,30 +18,19 @@
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.util.HashSet;
import java.util.Set;
@ -154,10 +143,6 @@ public class TranslogRecoveryPerformer {
// noop
}
public void beginTranslogRecovery() {
// noop
}
/**
* Returns the recovered types modifying the mapping during the recovery
*/

View File

@ -52,10 +52,10 @@ public class RecoveryState implements ToXContent, Streamable {
/** recovery of lucene files, either reusing local ones are copying new ones */
INDEX((byte) 1),
/** starting up the engine, potentially running checks */
START((byte) 2),
/** potentially running check index */
VERIFY_INDEX((byte) 2),
/** replaying the translog */
/** starting up the engine, replaying the translog */
TRANSLOG((byte) 3),
/** performing final task after all translog ops have been done */
@ -127,7 +127,7 @@ public class RecoveryState implements ToXContent, Streamable {
private final Index index = new Index();
private final Translog translog = new Translog();
private final Start start = new Start();
private final VerifyIndex verifyIndex = new VerifyIndex();
private final Timer timer = new Timer();
private Type type;
@ -183,21 +183,21 @@ public class RecoveryState implements ToXContent, Streamable {
// reinitializing stop remove all state except for start time
this.stage = Stage.INIT;
getIndex().reset();
getStart().reset();
getVerifyIndex().reset();
getTranslog().reset();
break;
case INDEX:
validateAndSetStage(Stage.INIT, stage);
getIndex().start();
break;
case START:
case VERIFY_INDEX:
validateAndSetStage(Stage.INDEX, stage);
getIndex().stop();
getStart().start();
getVerifyIndex().start();
break;
case TRANSLOG:
validateAndSetStage(Stage.START, stage);
getStart().stop();
validateAndSetStage(Stage.VERIFY_INDEX, stage);
getVerifyIndex().stop();
getTranslog().start();
break;
case FINALIZE:
@ -218,8 +218,8 @@ public class RecoveryState implements ToXContent, Streamable {
return index;
}
public Start getStart() {
return this.start;
public VerifyIndex getVerifyIndex() {
return this.verifyIndex;
}
public Translog getTranslog() {
@ -269,7 +269,7 @@ public class RecoveryState implements ToXContent, Streamable {
}
index.readFrom(in);
translog.readFrom(in);
start.readFrom(in);
verifyIndex.readFrom(in);
primary = in.readBoolean();
}
@ -287,7 +287,7 @@ public class RecoveryState implements ToXContent, Streamable {
}
index.writeTo(out);
translog.writeTo(out);
start.writeTo(out);
verifyIndex.writeTo(out);
out.writeBoolean(primary);
}
@ -333,8 +333,8 @@ public class RecoveryState implements ToXContent, Streamable {
translog.toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.START);
start.toXContent(builder, params);
builder.startObject(Fields.VERIFY_INDEX);
verifyIndex.toXContent(builder, params);
builder.endObject();
return builder;
@ -360,7 +360,7 @@ public class RecoveryState implements ToXContent, Streamable {
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog");
static final XContentBuilderString TOTAL_ON_START = new XContentBuilderString("total_on_start");
static final XContentBuilderString START = new XContentBuilderString("start");
static final XContentBuilderString VERIFY_INDEX = new XContentBuilderString("verify_index");
static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered");
static final XContentBuilderString RECOVERED_IN_BYTES = new XContentBuilderString("recovered_in_bytes");
static final XContentBuilderString CHECK_INDEX_TIME = new XContentBuilderString("check_index_time");
@ -440,7 +440,7 @@ public class RecoveryState implements ToXContent, Streamable {
}
public static class Start extends Timer implements ToXContent, Streamable {
public static class VerifyIndex extends Timer implements ToXContent, Streamable {
private volatile long checkIndexTime;

View File

@ -121,11 +121,11 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
}
};
} else if (randomBoolean()) {
timer = new Start();
timer = new VerifyIndex();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj() {
return new Start();
return new VerifyIndex();
}
};
} else {
@ -434,20 +434,20 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
}
public void testStart() throws IOException {
final Start start = new Start();
final VerifyIndex verifyIndex = new VerifyIndex();
AtomicBoolean stop = new AtomicBoolean();
Streamer<Start> streamer = new Streamer<Start>(stop, start) {
Streamer<VerifyIndex> streamer = new Streamer<VerifyIndex>(stop, verifyIndex) {
@Override
Start createObj() {
return new Start();
VerifyIndex createObj() {
return new VerifyIndex();
}
};
// we don't need to test the time aspect, it's done in the timer test
start.start();
assertThat(start.checkIndexTime(), equalTo(0l));
verifyIndex.start();
assertThat(verifyIndex.checkIndexTime(), equalTo(0l));
// force one
Start lastRead = streamer.serializeDeserialize();
VerifyIndex lastRead = streamer.serializeDeserialize();
assertThat(lastRead.checkIndexTime(), equalTo(0l));
long took = randomLong();
@ -456,30 +456,30 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
took = Math.max(0l, took);
}
start.checkIndexTime(took);
assertThat(start.checkIndexTime(), equalTo(took));
verifyIndex.checkIndexTime(took);
assertThat(verifyIndex.checkIndexTime(), equalTo(took));
boolean stopped = false;
if (randomBoolean()) {
start.stop();
verifyIndex.stop();
stopped = true;
}
if (randomBoolean()) {
start.reset();
verifyIndex.reset();
took = 0;
assertThat(start.checkIndexTime(), equalTo(took));
assertThat(verifyIndex.checkIndexTime(), equalTo(took));
}
lastRead = streamer.serializeDeserialize();
assertThat(lastRead.checkIndexTime(), equalTo(took));
assertThat(lastRead.startTime(), equalTo(start.startTime()));
assertThat(lastRead.stopTime(), equalTo(start.stopTime()));
assertThat(lastRead.startTime(), equalTo(verifyIndex.startTime()));
assertThat(lastRead.stopTime(), equalTo(verifyIndex.stopTime()));
if (stopped) {
assertThat(lastRead.time(), equalTo(start.time()));
assertThat(lastRead.time(), equalTo(verifyIndex.time()));
} else {
assertThat(lastRead.time(), lessThanOrEqualTo(start.time()));
assertThat(lastRead.time(), lessThanOrEqualTo(verifyIndex.time()));
}
}