add a proper master not discovered exception for master based operations (cluster state) and the ability to control the timeout till master is found

This commit is contained in:
kimchy 2010-07-21 01:51:58 +03:00
parent a2ceaa91cc
commit 39e546b64d
12 changed files with 140 additions and 3 deletions

View File

@ -95,6 +95,7 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
filterRoutingTable = in.readBoolean();
filterNodes = in.readBoolean();
filterMetaData = in.readBoolean();
@ -109,6 +110,7 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(filterRoutingTable);
out.writeBoolean(filterNodes);
out.writeBoolean(filterMetaData);

View File

@ -230,6 +230,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
cause = in.readUTF();
index = in.readUTF();
settings = readSettingsFromStream(in);
@ -241,6 +242,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(cause);
out.writeUTF(index);
writeSettingsToStream(settings, out);

View File

@ -92,11 +92,13 @@ public class DeleteIndexRequest extends MasterNodeOperationRequest {
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readUTF();
timeout = readTimeValue(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(index);
timeout.writeTo(out);
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
@ -32,6 +33,8 @@ import java.io.IOException;
*/
public abstract class MasterNodeOperationRequest implements ActionRequest {
private TimeValue masterNodeTimeout = TimeValue.timeValueSeconds(30);
@Override public boolean listenerThreaded() {
// always threaded
return true;
@ -42,9 +45,23 @@ public abstract class MasterNodeOperationRequest implements ActionRequest {
return this;
}
/**
* A timeout value in case the master has not been discovered yet or disconnected.
*/
public MasterNodeOperationRequest masterNodeTimeout(TimeValue timeout) {
this.masterNodeTimeout = timeout;
return this;
}
public TimeValue masterNodeTimeout() {
return this.masterNodeTimeout;
}
@Override public void readFrom(StreamInput in) throws IOException {
masterNodeTimeout = TimeValue.readTimeValue(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
masterNodeTimeout.writeTo(out);
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
@ -31,6 +30,7 @@ import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeCloseException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -94,7 +94,38 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
});
} else {
if (nodes.masterNode() == null) {
throw new ElasticSearchIllegalStateException("No master node discovered or set");
if (retrying) {
listener.onFailure(new MasterNotDiscoveredException());
} else {
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (clusterStateV2.nodes().masterNodeId() != null) {
// now we have a master, try and execute it...
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
@Override public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeCloseException(nodes.localNode()));
}
@Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(new MasterNotDiscoveredException());
}
@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
});
}
return;
}
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() {
@ -112,7 +143,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
} else {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
clusterService.add(TimeValue.timeValueSeconds(30), new TimeoutClusterStateListener() {
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (!clusterState.nodes().masterNodeId().equals(clusterStateV2.nodes().masterNodeId())) {

View File

@ -41,6 +41,14 @@ public class ClusterHealthRequestBuilder extends BaseClusterRequestBuilder<Clust
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public ClusterHealthRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
public ClusterHealthRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;

View File

@ -59,6 +59,15 @@ public class NodesShutdownRequestBuilder extends BaseClusterRequestBuilder<Nodes
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public NodesShutdownRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
@Override protected void doExecute(ActionListener<NodesShutdownResponse> listener) {
client.nodesShutdown(request, listener);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
@ -56,6 +57,14 @@ public class IndicesAliasesRequestBuilder extends BaseIndicesRequestBuilder<Indi
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public IndicesAliasesRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
@Override protected void doExecute(ActionListener<IndicesAliasesResponse> listener) {
client.aliases(request, listener);
}

View File

@ -130,6 +130,14 @@ public class CreateIndexRequestBuilder extends BaseIndicesRequestBuilder<CreateI
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public CreateIndexRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
@Override protected void doExecute(ActionListener<CreateIndexResponse> listener) {
client.create(request, listener);
}

View File

@ -53,6 +53,15 @@ public class DeleteIndexRequestBuilder extends BaseIndicesRequestBuilder<DeleteI
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public DeleteIndexRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
@Override protected void doExecute(ActionListener<DeleteIndexResponse> listener) {
client.delete(request, listener);
}

View File

@ -94,6 +94,14 @@ public class PutMappingRequestBuilder extends BaseIndicesRequestBuilder<PutMappi
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public PutMappingRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
/**
* If there is already a mapping definition registered against the type, then it will be merged. If there are
* elements that can't be merged are detected, the request will be rejected unless the

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.ElasticSearchException;
/**
* @author kimchy (shay.banon)
*/
public class MasterNotDiscoveredException extends ElasticSearchException {
public MasterNotDiscoveredException() {
super("");
}
}