improve gateway recovery when using delay index creation, close the loophole when the master was shut down before the delay expired and now other node becoming master will do the recovery

This commit is contained in:
kimchy 2010-06-15 10:27:26 +03:00
parent 98df1b3433
commit 48979ab6c8
10 changed files with 56 additions and 54 deletions

View File

@ -22,7 +22,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.cluster.node.DiscoveryNodes;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ClusterChangedEvent {
@ -32,15 +32,12 @@ public class ClusterChangedEvent {
private final ClusterState state;
private final boolean firstMaster;
private final DiscoveryNodes.Delta nodesDelta;
public ClusterChangedEvent(String source, ClusterState state, ClusterState previousState, boolean firstMaster) {
public ClusterChangedEvent(String source, ClusterState state, ClusterState previousState) {
this.source = source;
this.state = state;
this.previousState = previousState;
this.firstMaster = firstMaster;
this.nodesDelta = state.nodes().delta(previousState.nodes());
}
@ -71,10 +68,6 @@ public class ClusterChangedEvent {
return state.nodes().localNodeMaster();
}
public boolean firstMaster() {
return firstMaster;
}
public DiscoveryNodes.Delta nodesDelta() {
return this.nodesDelta;
}

View File

@ -34,7 +34,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ClusterState {

View File

@ -59,6 +59,7 @@ public class MetaData implements Iterable<IndexMetaData> {
private final int maxNumberOfShardsPerNode;
private final transient int totalNumberOfShards;
private final boolean recoveredFromGateway;
private final String[] allIndices;
@ -67,8 +68,9 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
private final ImmutableMap<String, ImmutableSet<String>> aliasAndIndexToIndexMap2;
private MetaData(ImmutableMap<String, IndexMetaData> indices, int maxNumberOfShardsPerNode) {
private MetaData(ImmutableMap<String, IndexMetaData> indices, boolean recoveredFromGateway, int maxNumberOfShardsPerNode) {
this.indices = ImmutableMap.copyOf(indices);
this.recoveredFromGateway = recoveredFromGateway;
this.maxNumberOfShardsPerNode = maxNumberOfShardsPerNode;
int totalNumberOfShards = 0;
for (IndexMetaData indexMetaData : indices.values()) {
@ -123,6 +125,13 @@ public class MetaData implements Iterable<IndexMetaData> {
this.aliasAndIndexToIndexMap2 = aliasAndIndexToIndexBuilder2.immutableMap();
}
/**
* Has the cluster state been recovered from the gateway.
*/
public boolean recoveredFromGateway() {
return this.recoveredFromGateway;
}
public ImmutableSet<String> aliases() {
return this.aliases;
}
@ -238,6 +247,8 @@ public class MetaData implements Iterable<IndexMetaData> {
private MapBuilder<String, IndexMetaData> indices = newMapBuilder();
private boolean recoveredFromGateway = false;
public Builder put(IndexMetaData.Builder indexMetaDataBuilder) {
return put(indexMetaDataBuilder.build());
}
@ -257,7 +268,8 @@ public class MetaData implements Iterable<IndexMetaData> {
}
public Builder metaData(MetaData metaData) {
indices.putAll(metaData.indices);
this.indices.putAll(metaData.indices);
this.recoveredFromGateway = metaData.recoveredFromGateway();
return this;
}
@ -266,8 +278,16 @@ public class MetaData implements Iterable<IndexMetaData> {
return this;
}
/**
* Indicates that this cluster state has been recovered from the gateawy.
*/
public Builder markAsRecoveredFromGateway() {
this.recoveredFromGateway = true;
return this;
}
public MetaData build() {
return new MetaData(indices.immutableMap(), maxNumberOfShardsPerNode);
return new MetaData(indices.immutableMap(), recoveredFromGateway, maxNumberOfShardsPerNode);
}
public static String toXContent(MetaData metaData) throws IOException {
@ -321,6 +341,8 @@ public class MetaData implements Iterable<IndexMetaData> {
public static MetaData readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
builder.maxNumberOfShardsPerNode(in.readInt());
// we only serialize it using readFrom, not in to/from XContent
builder.recoveredFromGateway = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexMetaData.Builder.readFrom(in, globalSettings));
@ -330,6 +352,7 @@ public class MetaData implements Iterable<IndexMetaData> {
public static void writeTo(MetaData metaData, StreamOutput out) throws IOException {
out.writeInt(metaData.maxNumberOfShardsPerNode());
out.writeBoolean(metaData.recoveredFromGateway());
out.writeVInt(metaData.indices.size());
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder.writeTo(indexMetaData, out);

View File

@ -171,7 +171,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.debug("Cluster state updated, version [{}], source [{}]", clusterState.version(), source);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState, discoveryService.firstMaster());
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {

View File

@ -37,11 +37,6 @@ public interface Discovery extends LifecycleComponent<Discovery> {
String nodeDescription();
/**
* Is the discovery of this node caused this node to be the first master in the cluster.
*/
boolean firstMaster();
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should not publish this state to the master as well! (the master is sending it...).

View File

@ -19,11 +19,11 @@
package org.elasticsearch.discovery;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.CountDownLatch;
@ -94,10 +94,6 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
return discovery.nodeDescription();
}
public boolean firstMaster() {
return discovery.firstMaster();
}
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should not publish this state to the master as well! (the master is sending it...).

View File

@ -59,8 +59,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private volatile boolean master = false;
private volatile boolean firstMaster = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
@ -91,7 +89,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (clusterGroup.members().size() == 1) {
// we are the first master (and the master)
master = true;
firstMaster = true;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
@ -180,10 +177,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
return clusterName.value() + "/" + localNode.id();
}
@Override public boolean firstMaster() {
return firstMaster;
}
@Override public void publish(ClusterState clusterState) {
if (!master) {
throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");

View File

@ -87,8 +87,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private volatile boolean master = false;
private volatile boolean firstMaster = false;
private volatile DiscoveryNodes latestDiscoNodes;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@ -198,10 +196,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return clusterName.value() + "/" + localNode.id();
}
@Override public boolean firstMaster() {
return firstMaster;
}
@Override public DiscoveryNodes nodes() {
DiscoveryNodes latestNodes = this.latestDiscoNodes;
if (latestNodes != null) {
@ -226,8 +220,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
retry = false;
DiscoveryNode masterNode = broadPingTillMasterResolved();
if (localNode.equals(masterNode)) {
// we are the master (first)
this.firstMaster = true;
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@ -247,7 +239,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
});
} else {
this.firstMaster = false;
this.master = false;
try {
// first, make sure we can connect to the master

View File

@ -66,7 +66,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final TimeValue delayIndexCreation;
private final AtomicBoolean firstMasterRead = new AtomicBoolean();
private final AtomicBoolean readFromGateway = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService,
ThreadPool threadPool, MetaDataService metaDataService) {
@ -78,8 +78,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
this.metaDataService = metaDataService;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
// allow to control a delay of when indices will get created
// TODO we need to maintain, on the cluster state, a flag that states if it was read from the gateway or not
// so if we delay, and the first master failed to start, others will load it
this.delayIndexCreation = componentSettings.getAsTime("delay_index_creation", null);
}
@ -89,8 +87,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// if we received initial state, see if we can recover within the start phase, so we hold the
// node from starting until we recovered properly
if (discoveryService.initialStateReceived()) {
if (discoveryService.firstMaster()) {
if (firstMasterRead.compareAndSet(false, true)) {
ClusterState clusterState = clusterService.state();
if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) {
if (readFromGateway.compareAndSet(false, true)) {
Boolean waited = readFromGateway(initialStateTimeout);
if (waited != null && !waited) {
logger.warn("Waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout);
@ -119,8 +118,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
if (lifecycle.started() && event.localNodeMaster()) {
if (event.firstMaster() && firstMasterRead.compareAndSet(false, true)) {
if (!lifecycle.started()) {
return;
}
if (event.localNodeMaster()) {
if (!event.state().metaData().recoveredFromGateway() && readFromGateway.compareAndSet(false, true)) {
executor.execute(new Runnable() {
@Override public void run() {
readFromGateway(null);
@ -162,10 +164,12 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
metaData = gateway.read();
} catch (Exception e) {
logger.error("Failed to read from gateway", e);
markMetaDataAsReadFromGateway("failure");
return false;
}
if (metaData == null) {
logger.debug("No state read from gateway");
markMetaDataAsReadFromGateway("no state");
return true;
}
final MetaData fMetaData = metaData;
@ -194,11 +198,25 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
return null;
}
private void markMetaDataAsReadFromGateway(String reason) {
clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData())
// mark the metadata as read from gateway
.markAsRecoveredFromGateway();
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();
}
});
}
private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) {
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData()).maxNumberOfShardsPerNode(fMetaData.maxNumberOfShardsPerNode());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : fMetaData) {

View File

@ -76,8 +76,6 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
private DiscoveryNode localNode;
private volatile boolean firstMaster = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
@ -150,7 +148,6 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
this.localNode = new DiscoveryNode(settings.get("name"), channel.getAddress().toString(), transportService.boundAddress().publishAddress(), buildCommonNodesAttributes(settings));
if (isMaster()) {
firstMaster = true;
clusterService.submitStateUpdateTask("jgroups-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
@ -205,10 +202,6 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
return channel.getClusterName() + "/" + channel.getAddress();
}
@Override public boolean firstMaster() {
return firstMaster;
}
@Override public void publish(ClusterState clusterState) {
if (!isMaster()) {
throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");