Merge pull request #10786 from jpountz/fix/dynamic_mappings_on_replicas

Internal: Wait for required mappings to be available on the replica before indexing.
This commit is contained in:
Adrien Grand 2015-04-24 22:20:50 +02:00
commit 46ac32ad4a
12 changed files with 338 additions and 86 deletions

View File

@ -23,15 +23,18 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
/**
* Base class for write action responses.
@ -156,6 +159,11 @@ public abstract class ActionWriteResponse extends ActionResponse {
return builder;
}
@Override
public String toString() {
return Strings.toString(this);
}
public static ShardInfo readShardInfo(StreamInput in) throws IOException {
ShardInfo shardInfo = new ShardInfo();
shardInfo.readFrom(in);

View File

@ -291,6 +291,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
} else {
throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request());
}
assert item.getPrimaryResponse() != null;
@ -532,7 +534,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) throws Exception {
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
for (int i = 0; i < request.items().length; i++) {
@ -548,28 +550,18 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}
@ -592,6 +584,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
throw e;
}
}
} else {
throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request());
}
}

View File

@ -105,4 +105,17 @@ public class IndexResponse extends ActionWriteResponse {
out.writeLong(version);
out.writeBoolean(created);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("IndexResponse[");
builder.append("index=").append(index);
builder.append(",type=").append(type);
builder.append(",id=").append(id);
builder.append(",version=").append(version);
builder.append(",created=").append(created);
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.index;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
@ -54,8 +53,6 @@ import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* Performs the index operation.
* <p/>
@ -73,6 +70,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;
private final ClusterService clusterService;
@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
@ -83,6 +82,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService;
}
@Override
@ -201,6 +201,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
version = index.version();
created = index.created();
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
@ -244,34 +245,24 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) throws IOException {
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}

View File

@ -21,10 +21,11 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
@ -35,11 +36,13 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -48,12 +51,21 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
@ -112,7 +124,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest) throws Exception;
protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest);
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
@ -203,12 +215,77 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
@Override
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
try {
shardOperationOnReplica(request.internalShardId, request);
} catch (Throwable t) {
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
throw t;
new AsyncReplicaAction(request, channel).run();
}
}
protected static class RetryOnReplicaException extends IndexShardException {
public RetryOnReplicaException(ShardId shardId, String msg) {
super(shardId, msg);
}
public RetryOnReplicaException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}
private final class AsyncReplicaAction extends AbstractRunnable {
private final ReplicaRequest request;
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
this.channel = channel;
}
@Override
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica", t);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
threadPool.executor(executor).execute(AsyncReplicaAction.this);
}
@Override
public void onClusterServiceClose() {
responseWithFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
throw new AssertionError("Cannot happen: there is not timeout");
}
});
} else {
try {
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
} catch (Throwable unexpected) {
logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected);
} finally {
responseWithFailure(t);
}
}
}
protected void responseWithFailure(Throwable t) {
try {
channel.sendResponse(t);
} catch (IOException responseException) {
logger.warn("failed to send error message back to client for action [" + transportReplicaAction + "]", responseException);
logger.warn("actual Exception", t);
}
}
@Override
protected void doRun() throws Exception {
shardOperationOnReplica(request.internalShardId, request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
@ -95,8 +96,10 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Adds a cluster state listener that will timeout after the provided timeout,
* and is executed after the clusterstate has been successfully applied ie. is
* in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED}
* NOTE: a {@code null} timeout means that the listener will never be removed
* automatically
*/
void add(TimeValue timeout, TimeoutClusterStateListener listener);
void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);
/**
* Submits a task that will update the cluster state.

View File

@ -42,18 +42,18 @@ public class ClusterStateObserver {
return changedEvent.previousState().version() != changedEvent.state().version();
}
};
private ClusterService clusterService;
private final ClusterService clusterService;
volatile TimeValue timeOutValue;
final AtomicReference<ObservedState> lastObservedState;
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<ObservingContext>(null);
volatile long startTime;
volatile Long startTime;
volatile boolean timedOut;
volatile TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
this(clusterService, new TimeValue(60000), logger);
@ -65,10 +65,12 @@ public class ClusterStateObserver {
* will fail any existing or new #waitForNextChange calls.
*/
public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) {
this.timeOutValue = timeout;
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.startTime = System.currentTimeMillis();
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTime = System.currentTimeMillis();
}
this.logger = logger;
}
@ -108,19 +110,24 @@ public class ClusterStateObserver {
if (observingContext.get() != null) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
long timeoutTimeLeft;
Long timeoutTimeLeft;
if (timeOutValue == null) {
timeOutValue = this.timeOutValue;
long timeSinceStart = System.currentTimeMillis() - startTime;
timeoutTimeLeft = timeOutValue.millis() - timeSinceStart;
if (timeoutTimeLeft <= 0l) {
// things have timeout while we were busy -> notify
logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
// update to latest, in case people want to retry
timedOut = true;
lastObservedState.set(new ObservedState(clusterService.state()));
listener.onTimeout(timeOutValue);
return;
if (timeOutValue != null) {
long timeSinceStart = System.currentTimeMillis() - startTime;
timeoutTimeLeft = timeOutValue.millis() - timeSinceStart;
if (timeoutTimeLeft <= 0l) {
// things have timeout while we were busy -> notify
logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
// update to latest, in case people want to retry
timedOut = true;
lastObservedState.set(new ObservedState(clusterService.state()));
listener.onTimeout(timeOutValue);
return;
}
} else {
timeoutTimeLeft = null;
}
} else {
this.startTime = System.currentTimeMillis();
@ -143,7 +150,7 @@ public class ClusterStateObserver {
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
clusterService.add(new TimeValue(timeoutTimeLeft), clusterStateListener);
clusterService.add(timeoutTimeLeft == null ? null : new TimeValue(timeoutTimeLeft), clusterStateListener);
}
}

View File

@ -230,7 +230,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
@Override
public void add(final TimeValue timeout, final TimeoutClusterStateListener listener) {
public void add(@Nullable final TimeValue timeout, final TimeoutClusterStateListener listener) {
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
return;
@ -240,9 +240,11 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
updateTasksExecutor.execute(new TimedPrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
public void run() {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
onGoingTimeouts.add(notifyTimeout);
if (timeout != null) {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
onGoingTimeouts.add(notifyTimeout);
}
postAppliedListeners.add(listener);
listener.postAdded();
}

View File

@ -21,13 +21,29 @@ package org.elasticsearch.common;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import java.io.BufferedReader;
import java.util.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
/**
*
@ -1063,4 +1079,18 @@ public class Strings {
public static String base64UUID() {
return TIME_UUID_GENERATOR.getBase64UUID();
}
/**
* Return a {@link String} that is the json representation of the provided
* {@link ToXContent}.
*/
public static String toString(ToXContent toXContent) {
try {
XContentBuilder builder = JsonXContent.contentBuilder();
toXContent.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder.string();
} catch (IOException e) {
throw new AssertionError("Cannot happen", e);
}
}
}

View File

@ -692,12 +692,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
static class ProcessClusterState {
final ClusterState clusterState;
final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed;
volatile boolean processed;
ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
ProcessClusterState(ClusterState clusterState) {
this.clusterState = clusterState;
this.newStateProcessed = newStateProcessed;
}
}
@ -738,7 +736,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState);
processNewClusterStates.add(processClusterState);
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";

View File

@ -105,7 +105,7 @@ public class ZenDiscoveryUnitTest extends ElasticsearchTestCase {
int numUpdates = scaledRandomIntBetween(50, 100);
LinkedList<ProcessClusterState> queue = new LinkedList<>();
for (int i = 0; i < numUpdates; i++) {
queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build(), null));
queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build()));
}
ProcessClusterState mostRecent = queue.get(numUpdates - 1);
Collections.shuffle(queue, getRandom());
@ -121,15 +121,15 @@ public class ZenDiscoveryUnitTest extends ElasticsearchTestCase {
DiscoveryNodes nodes2 = DiscoveryNodes.builder().masterNodeId("b").build();
LinkedList<ProcessClusterState> queue = new LinkedList<>();
ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build(), null);
ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build());
queue.offer(thirdMostRecent);
ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build(), null);
ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build());
queue.offer(secondMostRecent);
ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build(), null);
ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build());
queue.offer(mostRecent);
Collections.shuffle(queue, getRandom());
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build(), null));
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build(), null));
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build()));
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build()));
assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState));

View File

@ -20,6 +20,10 @@
package org.elasticsearch.indices.state;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
@ -27,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -35,6 +40,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -42,10 +51,16 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
/**
*/
@ -115,4 +130,118 @@ public class RareClusterStateTests extends ElasticsearchIntegrationTest {
assertHitCount(client().prepareSearch("test").get(), 0);
}
public void testDelayedMappingPropagationOnReplica() throws Exception {
// Here we want to test that everything goes well if the mappings that
// are needed for a document are not available on the replica at the
// time of indexing it
final List<String> nodeNames = internalCluster().startNodesAsync(2).get();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
final String master = internalCluster().getMasterName();
assertThat(nodeNames, hasItem(master));
String otherNode = null;
for (String node : nodeNames) {
if (node.equals(master) == false) {
otherNode = node;
break;
}
}
assertNotNull(otherNode);
// Force allocation of the primary on the master node by first only allocating on the master
// and then allowing all nodes so that the replica gets allocated on the other node
assertAcked(prepareCreate("index").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", master)).get());
assertAcked(client().admin().indices().prepareUpdateSettings("index").setSettings(ImmutableSettings.builder()
.put("index.routing.allocation.include._name", "")).get());
ensureGreen();
// Check routing tables
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(master, state.nodes().masterNode().name());
List<ShardRouting> shards = state.routingTable().allShards("index");
assertThat(shards, hasSize(2));
for (ShardRouting shard : shards) {
if (shard.primary()) {
// primary must be on the master
assertEquals(state.nodes().masterNodeId(), shard.currentNodeId());
} else {
assertTrue(shard.active());
}
}
// Block cluster state processing on the replica
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, getRandom());
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
final AtomicReference<Object> putMappingResponse = new AtomicReference<>();
client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
putMappingResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
putMappingResponse.set(e);
}
});
// Wait for mappings to be available on master
assertBusy(new Runnable() {
@Override
public void run() {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
final IndexService indexService = indicesService.indexServiceSafe("index");
assertNotNull(indexService);
final MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper("type");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("field"));
}
});
final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
docIndexResponse.set(e);
}
});
// Wait for document to be indexed on primary
assertBusy(new Runnable() {
@Override
public void run() {
assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists());
}
});
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
Thread.sleep(100);
assertThat(putMappingResponse.get(), equalTo(null));
assertThat(docIndexResponse.get(), equalTo(null));
// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
assertTrue(resp.isAcknowledged());
assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
IndexResponse docResp = (IndexResponse) docIndexResponse.get();
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
}
});
}
}