wait for mapping updates during local recovery

when the primary shard is recovering its translog, make sure to wait for new mapping introductions till the mappings have been updated on the master before finalizing the recovery itself
also, this change performs the mapping updates in a more optimized manner by batching the types to change into a single set and sending after the translog has been replayed

also, remove the wait for mapping on master in the local state tests since this new behavior covers it

closes #6666

remove waiting for mapping on master since we do it in recovery
This commit is contained in:
Shay Banon 2014-07-01 17:31:56 +02:00
parent 72d2ac1328
commit 2b1823cf02
3 changed files with 143 additions and 26 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.action.index; package org.elasticsearch.cluster.action.index;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -33,6 +34,7 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -48,9 +50,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -103,7 +103,11 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
} }
public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID) { public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID) {
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID)); updateMappingOnMaster(index, documentMapper, indexUUID, null);
}
public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID, MappingUpdateListener listener) {
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID, listener));
} }
@Override @Override
@ -243,14 +247,26 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
public final DocumentMapper documentMapper; public final DocumentMapper documentMapper;
public final String index; public final String index;
public final String indexUUID; public final String indexUUID;
public final MappingUpdateListener listener;
MappingChange(DocumentMapper documentMapper, String index, String indexUUID) { MappingChange(DocumentMapper documentMapper, String index, String indexUUID, MappingUpdateListener listener) {
this.documentMapper = documentMapper; this.documentMapper = documentMapper;
this.index = index; this.index = index;
this.indexUUID = indexUUID; this.indexUUID = indexUUID;
this.listener = listener;
} }
} }
/**
* A listener to be notified when the mappings were updated
*/
public static interface MappingUpdateListener {
void onMappingUpdate();
void onFailure(Throwable t);
}
/** /**
* The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the * The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
* indexing thread. * indexing thread.
@ -278,8 +294,62 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
this.interrupt(); this.interrupt();
} }
class UpdateKey {
public final String indexUUID;
public final String type;
UpdateKey(String indexUUID, String type) {
this.indexUUID = indexUUID;
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateKey updateKey = (UpdateKey) o;
if (!indexUUID.equals(updateKey.indexUUID)) return false;
if (!type.equals(updateKey.type)) return false;
return true;
}
@Override
public int hashCode() {
int result = indexUUID.hashCode();
result = 31 * result + type.hashCode();
return result;
}
}
class UpdateValue {
public final MappingChange mainChange;
public final List<MappingUpdateListener> listeners = Lists.newArrayList();
UpdateValue(MappingChange mainChange) {
this.mainChange = mainChange;
}
public void notifyListeners(@Nullable Throwable t) {
for (MappingUpdateListener listener : listeners) {
try {
if (t == null) {
listener.onMappingUpdate();
} else {
listener.onFailure(t);
}
} catch (Throwable lisFailure) {
logger.warn("unexpected failure on mapping update listener callback [{}]", lisFailure, listener);
}
}
}
}
@Override @Override
public void run() { public void run() {
Map<UpdateKey, UpdateValue> pendingUpdates = Maps.newHashMap();
while (running) { while (running) {
try { try {
MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES); MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES);
@ -292,13 +362,23 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
} }
queue.drainTo(changes); queue.drainTo(changes);
Collections.reverse(changes); // process then in newest one to oldest Collections.reverse(changes); // process then in newest one to oldest
Set<Tuple<String, String>> seenIndexAndTypes = Sets.newHashSet(); // go over and add to pending updates map
for (MappingChange change : changes) { for (MappingChange change : changes) {
Tuple<String, String> checked = Tuple.tuple(change.indexUUID, change.documentMapper.type()); UpdateKey key = new UpdateKey(change.indexUUID, change.documentMapper.type());
if (seenIndexAndTypes.contains(checked)) { UpdateValue updateValue = pendingUpdates.get(key);
continue; if (updateValue == null) {
updateValue = new UpdateValue(change);
pendingUpdates.put(key, updateValue);
} }
seenIndexAndTypes.add(checked); if (change.listener != null) {
updateValue.listeners.add(change.listener);
}
}
for (Iterator<UpdateValue> iterator = pendingUpdates.values().iterator(); iterator.hasNext(); ) {
final UpdateValue updateValue = iterator.next();
iterator.remove();
MappingChange change = updateValue.mainChange;
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest; final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try { try {
@ -312,6 +392,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
); );
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.documentMapper.type() + "]", t); logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.documentMapper.type() + "]", t);
updateValue.notifyListeners(t);
continue; continue;
} }
logger.trace("sending mapping updated to master: {}", mappingRequest); logger.trace("sending mapping updated to master: {}", mappingRequest);
@ -319,23 +400,30 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
@Override @Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
logger.debug("successfully updated master with mapping update: {}", mappingRequest); logger.debug("successfully updated master with mapping update: {}", mappingRequest);
updateValue.notifyListeners(null);
} }
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
logger.warn("failed to update master on updated mapping for {}", e, mappingRequest); logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
updateValue.notifyListeners(e);
} }
}); });
} }
} catch (InterruptedException e) {
// are we shutting down? continue and check
if (running) {
logger.warn("failed to process mapping updates", e);
}
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof InterruptedException && !running) {
// all is well, we are shutting down
} else {
logger.warn("failed to process mapping updates", t); logger.warn("failed to process mapping updates", t);
} }
// cleanup all pending update callbacks that were not processed due to a global failure...
for (Iterator<Map.Entry<UpdateKey, UpdateValue>> iterator = pendingUpdates.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<UpdateKey, UpdateValue> entry = iterator.next();
iterator.remove();
entry.getValue().notifyListeners(t);
}
}
} }
} }
} }

View File

@ -19,12 +19,14 @@
package org.elasticsearch.index.gateway.local; package org.elasticsearch.index.gateway.local;
import com.google.common.collect.Sets;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -52,7 +54,10 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -64,6 +69,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
private final IndexService indexService; private final IndexService indexService;
private final InternalIndexShard indexShard; private final InternalIndexShard indexShard;
private final TimeValue waitForMappingUpdatePostRecovery;
private final RecoveryState recoveryState = new RecoveryState(); private final RecoveryState recoveryState = new RecoveryState();
private volatile ScheduledFuture flushScheduler; private volatile ScheduledFuture flushScheduler;
@ -78,6 +85,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
this.indexService = indexService; this.indexService = indexService;
this.indexShard = (InternalIndexShard) indexShard; this.indexShard = (InternalIndexShard) indexShard;
this.waitForMappingUpdatePostRecovery = componentSettings.getAsTime("wait_for_mapping_update_post_recovery", TimeValue.timeValueSeconds(30));
syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5)); syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0) { if (syncInterval.millis() > 0) {
this.indexShard.translog().syncOnEachOperation(false); this.indexShard.translog().syncOnEachOperation(false);
@ -215,6 +223,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveryState.getTranslog().startTime(System.currentTimeMillis()); recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
FileInputStream fs = null; FileInputStream fs = null;
final Set<String> typesToUpdate = Sets.newHashSet();
try { try {
fs = new FileInputStream(recoveringTranslogFile); fs = new FileInputStream(recoveringTranslogFile);
InputStreamStreamInput si = new InputStreamStreamInput(fs); InputStreamStreamInput si = new InputStreamStreamInput(fs);
@ -232,8 +242,10 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} }
try { try {
Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation); Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation);
if (potentialIndexOperation != null) { if (potentialIndexOperation != null && potentialIndexOperation.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), potentialIndexOperation.docMapper(), indexService.indexUUID()); if (!typesToUpdate.contains(potentialIndexOperation.docMapper().type())) {
typesToUpdate.add(potentialIndexOperation.docMapper().type());
}
} }
recoveryState.getTranslog().addTranslogOperations(1); recoveryState.getTranslog().addTranslogOperations(1);
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
@ -260,6 +272,31 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveringTranslogFile.delete(); recoveringTranslogFile.delete();
for (final String type : typesToUpdate) {
final CountDownLatch latch = new CountDownLatch(1);
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.mapperService().documentMapper(type), indexService.indexUUID(), new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
logger.debug("failed to send mapping update post recovery to master for [{}]", t, type);
}
});
try {
boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS);
if (!waited) {
logger.debug("waited for mapping update on master for [{}], yet timed out");
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
}
}
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime()); recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
} }

View File

@ -175,8 +175,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> waiting for green status"); logger.info("--> waiting for green status");
ensureGreen(); ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
stateResponse = client().admin().cluster().prepareState().execute().actionGet(); stateResponse = client().admin().cluster().prepareState().execute().actionGet();
assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN)); assertThat(stateResponse.getState().metaData().index("test").state(), equalTo(IndexMetaData.State.OPEN));
@ -323,8 +321,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
logger.info("--> waiting for green status"); logger.info("--> waiting for green status");
ensureGreen(); ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify the doc is there"); logger.info("--> verify the doc is there");
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
@ -388,8 +384,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
assertAcked(client().admin().indices().prepareOpen("test").get()); assertAcked(client().admin().indices().prepareOpen("test").get());
logger.info("--> waiting for green status"); logger.info("--> waiting for green status");
ensureGreen(); ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify the doc is there"); logger.info("--> verify the doc is there");
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true)); assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(true));
@ -450,8 +444,6 @@ public class LocalGatewayIndexStateTests extends ElasticsearchIntegrationTest {
logger.info("--> waiting for green status"); logger.info("--> waiting for green status");
ensureGreen(); ensureGreen();
// we need to wait for mapping on master since the mapping update from translog update might get delayed
waitForMappingOnMaster("test", "type1");
logger.info("--> verify that the dangling index does exists now!"); logger.info("--> verify that the dangling index does exists now!");
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));