From 2b1823cf02ff5a03a06c13894768a4fa45f24075 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 1 Jul 2014 17:31:56 +0200 Subject: [PATCH] wait for mapping updates during local recovery when the primary shard is recovering its translog, make sure to wait for new mapping introductions till the mappings have been updated on the master before finalizing the recovery itself also, this change performs the mapping updates in a more optimized manner by batching the types to change into a single set and sending after the translog has been replayed also, remove the wait for mapping on master in the local state tests since this new behavior covers it closes #6666 remove waiting for mapping on master since we do it in recovery --- .../action/index/MappingUpdatedAction.java | 120 +++++++++++++++--- .../gateway/local/LocalIndexShardGateway.java | 41 +++++- .../local/LocalGatewayIndexStateTests.java | 8 -- 3 files changed, 143 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 6312a052056..e41204079a2 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.index; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -48,9 +50,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -103,7 +103,11 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listeners = Lists.newArrayList(); + + UpdateValue(MappingChange mainChange) { + this.mainChange = mainChange; + } + + public void notifyListeners(@Nullable Throwable t) { + for (MappingUpdateListener listener : listeners) { + try { + if (t == null) { + listener.onMappingUpdate(); + } else { + listener.onFailure(t); + } + } catch (Throwable lisFailure) { + logger.warn("unexpected failure on mapping update listener callback [{}]", lisFailure, listener); + } + } + } + } + @Override public void run() { + Map pendingUpdates = Maps.newHashMap(); while (running) { try { MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES); @@ -292,13 +362,23 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction> seenIndexAndTypes = Sets.newHashSet(); + // go over and add to pending updates map for (MappingChange change : changes) { - Tuple checked = Tuple.tuple(change.indexUUID, change.documentMapper.type()); - if (seenIndexAndTypes.contains(checked)) { - continue; + UpdateKey key = new UpdateKey(change.indexUUID, change.documentMapper.type()); + UpdateValue updateValue = pendingUpdates.get(key); + if (updateValue == null) { + updateValue = new UpdateValue(change); + pendingUpdates.put(key, updateValue); } - seenIndexAndTypes.add(checked); + if (change.listener != null) { + updateValue.listeners.add(change.listener); + } + } + + for (Iterator iterator = pendingUpdates.values().iterator(); iterator.hasNext(); ) { + final UpdateValue updateValue = iterator.next(); + iterator.remove(); + MappingChange change = updateValue.mainChange; final MappingUpdatedAction.MappingUpdatedRequest mappingRequest; try { @@ -312,6 +392,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction> iterator = pendingUpdates.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + iterator.remove(); + entry.getValue().notifyListeners(t); + } } } } diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index d02cb480fb8..ed8d561463f 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -19,12 +19,14 @@ package org.elasticsearch.index.gateway.local; +import com.google.common.collect.Sets; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.lucene.Lucene; @@ -52,7 +54,10 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * @@ -64,6 +69,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen private final IndexService indexService; private final InternalIndexShard indexShard; + private final TimeValue waitForMappingUpdatePostRecovery; + private final RecoveryState recoveryState = new RecoveryState(); private volatile ScheduledFuture flushScheduler; @@ -78,6 +85,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen this.indexService = indexService; this.indexShard = (InternalIndexShard) indexShard; + this.waitForMappingUpdatePostRecovery = componentSettings.getAsTime("wait_for_mapping_update_post_recovery", TimeValue.timeValueSeconds(30)); syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5)); if (syncInterval.millis() > 0) { this.indexShard.translog().syncOnEachOperation(false); @@ -215,6 +223,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen recoveryState.getTranslog().startTime(System.currentTimeMillis()); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); FileInputStream fs = null; + + final Set typesToUpdate = Sets.newHashSet(); try { fs = new FileInputStream(recoveringTranslogFile); InputStreamStreamInput si = new InputStreamStreamInput(fs); @@ -232,8 +242,10 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } try { Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation); - if (potentialIndexOperation != null) { - mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), potentialIndexOperation.docMapper(), indexService.indexUUID()); + if (potentialIndexOperation != null && potentialIndexOperation.parsedDoc().mappingsModified()) { + if (!typesToUpdate.contains(potentialIndexOperation.docMapper().type())) { + typesToUpdate.add(potentialIndexOperation.docMapper().type()); + } } recoveryState.getTranslog().addTranslogOperations(1); } catch (ElasticsearchException e) { @@ -260,6 +272,31 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen recoveringTranslogFile.delete(); + for (final String type : typesToUpdate) { + final CountDownLatch latch = new CountDownLatch(1); + mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.mapperService().documentMapper(type), indexService.indexUUID(), new MappingUpdatedAction.MappingUpdateListener() { + @Override + public void onMappingUpdate() { + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + latch.countDown(); + logger.debug("failed to send mapping update post recovery to master for [{}]", t, type); + } + }); + + try { + boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS); + if (!waited) { + logger.debug("waited for mapping update on master for [{}], yet timed out"); + } + } catch (InterruptedException e) { + logger.debug("interrupted while waiting for mapping update"); + } + } + recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime()); } diff --git a/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java b/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java index 1f5c9641db8..d5f254c58f7 100644 --- a/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java +++ b/src/test/java/org/elasticsearch/gateway/local/LocalGatewayIndexStateTests.java @@ -175,8 +175,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for green status"); ensureGreen(); - // we need to wait for mapping on master since the mapping update from translog update might get delayed - waitForMappingOnMaster("test", "type1"); stateResponse = client().admin().cluster().prepareState().execute().actionGet(); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN)); @@ -323,8 +321,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest { assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); logger.info("--> waiting for green status"); ensureGreen(); - // we need to wait for mapping on master since the mapping update from translog update might get delayed - waitForMappingOnMaster("test", "type1"); logger.info("--> verify the doc is there"); assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); @@ -388,8 +384,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest { assertAcked(client().admin().indices().prepareOpen("test").get()); logger.info("--> waiting for green status"); ensureGreen(); - // we need to wait for mapping on master since the mapping update from translog update might get delayed - waitForMappingOnMaster("test", "type1"); logger.info("--> verify the doc is there"); assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); @@ -450,8 +444,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for green status"); ensureGreen(); - // we need to wait for mapping on master since the mapping update from translog update might get delayed - waitForMappingOnMaster("test", "type1"); logger.info("--> verify that the dangling index does exists now!"); assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));