Recovery from local gateway should re-introduce new mappings

The delayed mapping intro tests exposed a bug where if a new mapping is introduced, yet not updated on the master, and a full restart occurs, reply of the transaction log will not cause the new mapping to be re-introduced.
closes #6659

add comment on the method
This commit is contained in:
Shay Banon 2014-07-01 01:08:36 +02:00
parent e8519084c9
commit 46f1e30fa9
4 changed files with 71 additions and 73 deletions

View File

@ -24,13 +24,16 @@ import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
@ -57,7 +60,8 @@ import java.util.concurrent.ScheduledFuture;
public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final ThreadPool threadPool;
private final MappingUpdatedAction mappingUpdatedAction;
private final IndexService indexService;
private final InternalIndexShard indexShard;
private final RecoveryState recoveryState = new RecoveryState();
@ -66,9 +70,12 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
private final TimeValue syncInterval;
@Inject
public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard) {
public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, MappingUpdatedAction mappingUpdatedAction,
IndexService indexService, IndexShard indexShard) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.mappingUpdatedAction = mappingUpdatedAction;
this.indexService = indexService;
this.indexShard = (InternalIndexShard) indexShard;
syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5));
@ -224,7 +231,10 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
break;
}
try {
indexShard.performRecoveryOperation(operation);
Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation);
if (potentialIndexOperation != null) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), potentialIndexOperation.docMapper(), indexService.indexUUID());
}
recoveryState.getTranslog().addTranslogOperations(1);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {

View File

@ -736,24 +736,33 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.enableGcDeletes(true);
}
public void performRecoveryOperation(Translog.Operation operation) throws ElasticsearchException {
/**
* Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation)
* that can then be used for mapping updates (for example) if needed.
*/
public Engine.IndexingOperation performRecoveryOperation(Translog.Operation operation) throws ElasticsearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Engine.IndexingOperation indexOperation = null;
try {
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(
Engine.Create engineCreate = prepareCreate(
source(create.source()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false));
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
engine.create(engineCreate);
indexOperation = engineCreate;
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true));
Engine.Index engineIndex = prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
engine.index(engineIndex);
indexOperation = engineIndex;
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
@ -786,6 +795,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
throw e;
}
}
return indexOperation;
}
/**

View File

@ -39,18 +39,20 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
/**
*
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@Slow
public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
@ -68,12 +70,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
.execute().actionGet();
logger.info("--> waiting for yellow status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
if (health.isTimedOut()) {
ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet();
System.out.println("" + response);
}
assertThat(health.isTimedOut(), equalTo(false));
ensureYellow();
logger.info("--> verify meta _routing required exists");
MappingMetaData mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
@ -83,12 +80,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
internalCluster().fullRestart();
logger.info("--> waiting for yellow status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet();
if (health.isTimedOut()) {
ClusterStateResponse response = client().admin().cluster().prepareState().execute().actionGet();
System.out.println("" + response);
}
assertThat(health.isTimedOut(), equalTo(false));
ensureYellow();
logger.info("--> verify meta _routing required exists");
mappingMd = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").mapping("type1");
@ -107,8 +99,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
NumShards test = getNumShards("test");
logger.info("--> waiting for green status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
@ -126,9 +117,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
logger.info("--> verifying that the state is green");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
ensureGreen();
logger.info("--> trying to index into a closed index ...");
try {
@ -141,17 +130,13 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> creating another index (test2) by indexing into it");
client().prepareIndex("test2", "type1", "1").setSource("field1", "value1").execute().actionGet();
logger.info("--> verifying that the state is green");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
ensureGreen();
logger.info("--> opening the first index again...");
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> verifying that the state is green");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
ensureGreen();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
@ -171,8 +156,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> restarting nodes...");
internalCluster().fullRestart();
logger.info("--> waiting for two nodes and green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
@ -190,8 +174,9 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
client().admin().indices().prepareOpen("test").execute().actionGet();
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
@ -243,8 +228,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
client().admin().indices().prepareCreate("test").execute().actionGet();
logger.info("--> waiting for test index to be created");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setIndices("test").setWaitForYellowStatus().execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureYellow();
client().prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet();
}
@ -272,7 +256,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> closing test index...");
client().admin().indices().prepareClose("test").execute().actionGet();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test"), nullValue());
@ -304,15 +287,14 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> restarting the nodes");
final Gateway gateway1 = internalCluster().getInstance(Gateway.class, node_1);
internalCluster().fullRestart(new RestartCallback() {
@ -327,8 +309,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
});
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// spin a bit waiting for the index to exists
long time = System.currentTimeMillis();
@ -341,8 +322,9 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> verify that the dangling index exists");
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify the doc is there");
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
@ -353,7 +335,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
Settings settings = settingsBuilder()
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "closed")
.build();
logger.info("--> starting two nodes");
final String node_1 = internalCluster().startNode(settings);
@ -363,15 +345,14 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(1l));
}
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
logger.info("--> restarting the nodes");
final Gateway gateway1 = internalCluster().getInstance(Gateway.class, node_1);
internalCluster().fullRestart(new RestartCallback() {
@ -386,8 +367,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
});
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// spin a bit waiting for the index to exists
long time = System.currentTimeMillis();
@ -400,16 +380,16 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> verify that the dangling index exists");
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify the index state is closed");
assertThat(client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
logger.info("--> open the index");
client().admin().indices().prepareOpen("test").execute().actionGet();
assertAcked(client().admin().indices().prepareOpen("test").get());
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify the doc is there");
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
@ -421,15 +401,14 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
.put("gateway.type", "local").put("gateway.local.auto_import_dangled", "no")
.build();
logger.info("--> starting two nodes");
final String node_1 = internalCluster().startNode(settings);
final String node_1 = internalCluster().startNodesAsync(2, settings).get().get(0);
internalCluster().startNode(settings);
logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
@ -440,7 +419,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> restarting the nodes");
final Gateway gateway1 = internalCluster().getInstance(Gateway.class, node_1);
internalCluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
@ -452,8 +431,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
});
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// we need to wait for the allocate dangled to kick in (even though in this case its disabled)
// just to make sure
@ -471,8 +449,9 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
});
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify that the dangling index does exists now!");
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
@ -494,8 +473,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
@ -505,7 +483,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> restarting the nodes");
final Gateway gateway1 = internalCluster().getInstance(Gateway.class, node_1);
internalCluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
if (node_1.equals(nodeName)) {
@ -517,8 +495,7 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
});
logger.info("--> waiting for green status");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ensureGreen();
logger.info("--> verify that the dangling index does not exists");
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(false));

View File

@ -222,6 +222,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
assertAcked(client().admin().indices().prepareClose("test"));
assertAcked(client().admin().indices().prepareOpen("test"));
ensureGreen();
waitForConcreteMappingsOnAll("test", "type1", "field1");
logger.info("--> Percolate doc with field1=100");
response = client().preparePercolate()