diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java index 378da826016..60cb506f7da 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -62,9 +62,10 @@ public class ClusterStateObserver { /** * @param clusterService * @param timeout a global timeout for this observer. After it has expired the observer - * will fail any existing or new #waitForNextChange calls. + * will fail any existing or new #waitForNextChange calls. Set to null + * to wait indefinitely */ - public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) { + public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) { this.clusterService = clusterService; this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state())); this.timeOutValue = timeout; diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 42bab8ca7fe..46158f93035 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -20,15 +20,10 @@ package org.elasticsearch.index.engine; import com.google.common.collect.Lists; - import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.BooleanClause.Occur; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.SearcherFactory; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; @@ -219,7 +214,7 @@ public class InternalEngine extends Engine { Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { - handler.performRecoveryOperation(this, operation); + handler.performRecoveryOperation(this, operation, true); opsRecovered++; } catch (ElasticsearchException e) { if (e.status() == RestStatus.BAD_REQUEST) { diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 75b322f45a6..c372b6a3cd2 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -804,8 +804,8 @@ public class IndexShard extends AbstractIndexShardComponent { /** * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an opertion failed to apply. - * Note: This method is typically used in peer recovery to replay remote tansaction log entries. + * This operation will stop applying operations once an operation failed to apply. + * Note: This method is typically used in peer recovery to replay remote transaction log entries. */ public int performBatchRecovery(Iterable operations) { if (state != IndexShardState.RECOVERING) { @@ -1389,7 +1389,7 @@ public class IndexShard extends AbstractIndexShardComponent { * Returns the current translog durability mode */ public Translog.Durabilty getTranslogDurability() { - return translogConfig.getDurabilty(); + return translogConfig.getDurabilty(); } private static Translog.Durabilty getFromSettings(ESLogger logger, Settings settings, Translog.Durabilty defaultValue) { diff --git a/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index bbe613b61c9..0b62ea7ea61 100644 --- a/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -33,12 +33,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.MapperAnalyzer; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.MapperUtils; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.query.QueryParsingException; @@ -73,20 +68,28 @@ public class TranslogRecoveryPerformer { return mapperService.documentMapperWithAutoCreate(type); // protected for testing } - /* + /** * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an opertion failed to apply. + * This operation will stop applying operations once an operation failed to apply. + * + * Throws a {@link MapperException} to be thrown if a mapping update is encountered. */ int performBatchRecovery(Engine engine, Iterable operations) { int numOps = 0; for (Translog.Operation operation : operations) { - performRecoveryOperation(engine, operation); + performRecoveryOperation(engine, operation, false); numOps++; } return numOps; } - private void addMappingUpdate(String type, Mapping update) { + private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) { + if (update == null) { + return; + } + if (allowMappingUpdates == false) { + throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])"); + } Mapping currentUpdate = recoveredTypes.get(type); if (currentUpdate == null) { recoveredTypes.put(type, update); @@ -96,10 +99,13 @@ public class TranslogRecoveryPerformer { } /** - * Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation) - * that can then be used for mapping updates (for example) if needed. + * Performs a single recovery operation. + * + * @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will + * cause a {@link MapperException} to be thrown if an update + * is encountered. */ - public void performRecoveryOperation(Engine engine, Translog.Operation operation) { + public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) { try { switch (operation.opType()) { case CREATE: @@ -109,10 +115,8 @@ public class TranslogRecoveryPerformer { .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()), create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false); mapperAnalyzer.setType(create.type()); // this is a PITA - once mappings are per index not per type this can go away an we can just simply move this to the engine eventually :) + maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates); engine.create(engineCreate); - if (engineCreate.parsedDoc().dynamicMappingsUpdate() != null) { - addMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate()); - } break; case SAVE: Translog.Index index = (Translog.Index) operation; @@ -120,10 +124,8 @@ public class TranslogRecoveryPerformer { .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true); mapperAnalyzer.setType(index.type()); + maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); engine.index(engineIndex); - if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { - addMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate()); - } break; case DELETE: Translog.Delete delete = (Translog.Delete) operation; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 26ce397eda5..49bf2de0ea7 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -26,8 +26,12 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -41,6 +45,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.engine.RecoveryEngineException; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; @@ -294,13 +299,51 @@ public class RecoveryTarget extends AbstractComponent { class TranslogOperationsRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { + public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); final RecoveryStatus recoveryStatus = statusRef.status(); final RecoveryState.Translog translog = recoveryStatus.state().getTranslog(); translog.totalOperations(request.totalTranslogOps()); assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state(); - recoveryStatus.indexShard().performBatchRecovery(request.operations()); + try { + recoveryStatus.indexShard().performBatchRecovery(request.operations()); + } catch (MapperException mapperException) { + // in very rare cases a translog replay from primary is processed before a mapping update on this node + // which causes local mapping changes. we want to wait until these mappings are processed. + logger.trace("delaying recovery due to missing mapping changes", mapperException); + // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be + // canceled) + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + try { + messageReceived(request, channel); + } catch (Exception e) { + onFailure(e); + } + } + + protected void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + logger.warn("failed to send error back to recovery source", e1); + } + } + + @Override + public void onClusterServiceClose() { + onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates")); + } + + @Override + public void onTimeout(TimeValue timeout) { + // note that we do not use a timeout (see comment above) + onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + "])")); + } + }); + } } channel.sendResponse(TransportResponse.Empty.INSTANCE); diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3e7fcc79d55..45fed8a5941 100644 --- a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -47,10 +47,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; @@ -349,7 +347,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { shard.state = IndexShardState.RECOVERING; try { shard.recoveryState().getTranslog().totalOperations(1); - shard.engine().config().getTranslogRecoveryPerformer().performRecoveryOperation(shard.engine(), new Translog.DeleteByQuery(new Engine.DeleteByQuery(null, new BytesArray("{\"term\" : { \"user\" : \"kimchy\" }}"), null, null, null, Engine.Operation.Origin.RECOVERY, 0, "person"))); + shard.engine().config().getTranslogRecoveryPerformer().performRecoveryOperation(shard.engine(), new Translog.DeleteByQuery(new Engine.DeleteByQuery(null, new BytesArray("{\"term\" : { \"user\" : \"kimchy\" }}"), null, null, null, Engine.Operation.Origin.RECOVERY, 0, "person")), false); assertTrue(version.onOrBefore(Version.V_1_0_0_Beta2)); numDocs = 0; } catch (QueryParsingException ex) {