Recovery: restart recovery upon mapping changes during translog replay

In rare occasion, the translog replay phase of recovery may require mapping changes on the target shard. This can happen where indexing on the primary introduces new mappings while the recovery is in phase1. If the source node processes the new mapping from the master, allowing the indexing to proceed, before the target node does and the recovery moves to the phase 2 (translog replay) before as well, the translog operations arriving on the target node may miss the mapping changes. To protect agains this we now throw and catch an exception, so we can properly wait and retry when the next cluster state arrives.

Closes #11281
Closes #11363
This commit is contained in:
Boaz Leskes 2015-05-27 09:11:30 +03:00
parent 129d8ec29a
commit ea41ee9243
6 changed files with 76 additions and 37 deletions

View File

@ -62,9 +62,10 @@ public class ClusterStateObserver {
/**
* @param clusterService
* @param timeout a global timeout for this observer. After it has expired the observer
* will fail any existing or new #waitForNextChange calls.
* will fail any existing or new #waitForNextChange calls. Set to null
* to wait indefinitely
*/
public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) {
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;

View File

@ -20,15 +20,10 @@
package org.elasticsearch.index.engine;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
@ -219,7 +214,7 @@ public class InternalEngine extends Engine {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
handler.performRecoveryOperation(this, operation);
handler.performRecoveryOperation(this, operation, true);
opsRecovered++;
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {

View File

@ -804,8 +804,8 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* Note: This method is typically used in peer recovery to replay remote tansaction log entries.
* This operation will stop applying operations once an operation failed to apply.
* Note: This method is typically used in peer recovery to replay remote transaction log entries.
*/
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
@ -1389,7 +1389,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* Returns the current translog durability mode
*/
public Translog.Durabilty getTranslogDurability() {
return translogConfig.getDurabilty();
return translogConfig.getDurabilty();
}
private static Translog.Durabilty getFromSettings(ESLogger logger, Settings settings, Translog.Durabilty defaultValue) {

View File

@ -33,12 +33,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperUtils;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryParsingException;
@ -73,20 +68,28 @@ public class TranslogRecoveryPerformer {
return mapperService.documentMapperWithAutoCreate(type); // protected for testing
}
/*
/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* This operation will stop applying operations once an operation failed to apply.
*
* Throws a {@link MapperException} to be thrown if a mapping update is encountered.
*/
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
int numOps = 0;
for (Translog.Operation operation : operations) {
performRecoveryOperation(engine, operation);
performRecoveryOperation(engine, operation, false);
numOps++;
}
return numOps;
}
private void addMappingUpdate(String type, Mapping update) {
private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
if (update == null) {
return;
}
if (allowMappingUpdates == false) {
throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])");
}
Mapping currentUpdate = recoveredTypes.get(type);
if (currentUpdate == null) {
recoveredTypes.put(type, update);
@ -96,10 +99,13 @@ public class TranslogRecoveryPerformer {
}
/**
* Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation)
* that can then be used for mapping updates (for example) if needed.
* Performs a single recovery operation.
*
* @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will
* cause a {@link MapperException} to be thrown if an update
* is encountered.
*/
public void performRecoveryOperation(Engine engine, Translog.Operation operation) {
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
case CREATE:
@ -109,10 +115,8 @@ public class TranslogRecoveryPerformer {
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
mapperAnalyzer.setType(create.type()); // this is a PITA - once mappings are per index not per type this can go away an we can just simply move this to the engine eventually :)
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
engine.create(engineCreate);
if (engineCreate.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate());
}
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
@ -120,10 +124,8 @@ public class TranslogRecoveryPerformer {
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
mapperAnalyzer.setType(index.type());
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
engine.index(engineIndex);
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate());
}
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;

View File

@ -26,8 +26,12 @@ import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
@ -41,6 +45,7 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
@ -294,13 +299,51 @@ public class RecoveryTarget extends AbstractComponent {
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
translog.totalOperations(request.totalTranslogOps());
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
recoveryStatus.indexShard().performBatchRecovery(request.operations());
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
} catch (MapperException mapperException) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes", mapperException);
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel);
} catch (Exception e) {
onFailure(e);
}
}
protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
}
}
@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
}
@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + "])"));
}
});
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);

View File

@ -47,10 +47,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
@ -349,7 +347,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
shard.state = IndexShardState.RECOVERING;
try {
shard.recoveryState().getTranslog().totalOperations(1);
shard.engine().config().getTranslogRecoveryPerformer().performRecoveryOperation(shard.engine(), new Translog.DeleteByQuery(new Engine.DeleteByQuery(null, new BytesArray("{\"term\" : { \"user\" : \"kimchy\" }}"), null, null, null, Engine.Operation.Origin.RECOVERY, 0, "person")));
shard.engine().config().getTranslogRecoveryPerformer().performRecoveryOperation(shard.engine(), new Translog.DeleteByQuery(new Engine.DeleteByQuery(null, new BytesArray("{\"term\" : { \"user\" : \"kimchy\" }}"), null, null, null, Engine.Operation.Origin.RECOVERY, 0, "person")), false);
assertTrue(version.onOrBefore(Version.V_1_0_0_Beta2));
numDocs = 0;
} catch (QueryParsingException ex) {