[Relocation] process pending mapping update in phase 2

During phase1 we copy over all lucene segments. These may refer to mapping updates that are still queued up to be sent to master. We must make sure those pending updates are processed before completing the relocation.

Relates to #6648

Closes #6762
This commit is contained in:
Boaz Leskes 2014-07-07 09:22:13 +02:00
parent 7023caa1a1
commit 7f5f4e842e
3 changed files with 65 additions and 15 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.action.index;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
@ -35,7 +34,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -50,7 +48,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -305,13 +306,21 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UpdateKey updateKey = (UpdateKey) o; UpdateKey updateKey = (UpdateKey) o;
if (!indexUUID.equals(updateKey.indexUUID)) return false; if (!indexUUID.equals(updateKey.indexUUID)) {
if (!type.equals(updateKey.type)) return false; return false;
}
if (!type.equals(updateKey.type)) {
return false;
}
return true; return true;
} }

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
@ -38,6 +39,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -53,6 +56,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -68,6 +72,7 @@ public class RecoverySource extends AbstractComponent {
private final TransportService transportService; private final TransportService transportService;
private final IndicesService indicesService; private final IndicesService indicesService;
private final RecoverySettings recoverySettings; private final RecoverySettings recoverySettings;
private final MappingUpdatedAction mappingUpdatedAction;
private final ClusterService clusterService; private final ClusterService clusterService;
@ -77,10 +82,11 @@ public class RecoverySource extends AbstractComponent {
@Inject @Inject
public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService, public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService,
RecoverySettings recoverySettings, ClusterService clusterService) { RecoverySettings recoverySettings, MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.clusterService = clusterService; this.clusterService = clusterService;
this.recoverySettings = recoverySettings; this.recoverySettings = recoverySettings;
@ -91,7 +97,8 @@ public class RecoverySource extends AbstractComponent {
} }
private RecoveryResponse recover(final StartRecoveryRequest request) { private RecoveryResponse recover(final StartRecoveryRequest request) {
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); final IndexService indexService = indicesService.indexServiceSafe(request.shardId().index().name());
final InternalIndexShard shard = (InternalIndexShard) indexService.shardSafe(request.shardId().id());
// verify that our (the source) shard state is marking the shard to be in recovery mode as well, otherwise // verify that our (the source) shard state is marking the shard to be in recovery mode as well, otherwise
// the index operations will not be routed to it properly // the index operations will not be routed to it properly
@ -251,22 +258,58 @@ public class RecoverySource extends AbstractComponent {
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId()); throw new IndexShardClosedException(request.shardId());
} }
logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode()); logger.trace("{} recovery [phase2] to {}: start", request.shardId(), request.targetNode());
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop(); stopWatch.stop();
response.startTime = stopWatch.totalTime().millis(); response.startTime = stopWatch.totalTime().millis();
logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); logger.trace("{} recovery [phase2] to {}: start took [{}]", request.shardId(), request.targetNode(), request.targetNode(), stopWatch.totalTime());
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
logger.trace("{} recovery [phase2] to {}: updating current mapping to master", request.shardId(), request.targetNode());
updateMappingOnMaster();
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
stopWatch = new StopWatch().start(); stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot); int totalOperations = sendSnapshot(snapshot);
stopWatch.stop(); stopWatch.stop();
logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis(); response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations; response.phase2Operations = totalOperations;
} }
private void updateMappingOnMaster() {
List<DocumentMapper> documentMappersToUpdate = Lists.newArrayList(indexService.mapperService());
if (documentMappersToUpdate.size() == 0) {
return;
}
final CountDownLatch countDownLatch = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
countDownLatch.countDown();
}
@Override
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
countDownLatch.countDown();
}
};
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper, indexService.indexUUID(), listener);
}
try {
if (!countDownLatch.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.debug("{} recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]", request.shardId(), request.targetNode(), internalActionTimeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("interrupted while waiting for mapping to update on master");
}
}
@Override @Override
public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException { public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException {
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.client.Requests.*;
@ -50,7 +49,6 @@ public class SimpleRecoveryTests extends ElasticsearchIntegrationTest {
} }
@Test @Test
@TestLogging(value = "cluster.service:TRACE,action.get:TRACE")
public void testSimpleRecovery() throws Exception { public void testSimpleRecovery() throws Exception {
assertAcked(prepareCreate("test", 1).execute().actionGet(5000)); assertAcked(prepareCreate("test", 1).execute().actionGet(5000));