Refactor master node change predicate for reuse

This commit migrates a ClusterStateObserver.ChangePredicate for
detecting a master node change into a separate class for reuse
elsewhere.
This commit is contained in:
Jason Tedor 2016-01-03 12:39:25 -05:00
parent 6a12b5e59a
commit c47340f2f1
2 changed files with 44 additions and 18 deletions

View File

@ -26,10 +26,10 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -50,20 +50,6 @@ import java.util.function.Supplier;
* A base class for operations that needs to be performed on the master node. * A base class for operations that needs to be performed on the master node.
*/ */
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> { public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
private static final ClusterStateObserver.ChangePredicate masterNodeChangedPredicate = new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
// The condition !newState.nodes().masterNodeId().equals(previousState.nodes().masterNodeId()) is not sufficient as the same master node might get reelected after a disruption.
return newState.nodes().masterNodeId() != null && newState != previousState;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
};
protected final TransportService transportService; protected final TransportService transportService;
protected final ClusterService clusterService; protected final ClusterService clusterService;
@ -148,7 +134,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
if (t instanceof Discovery.FailedToCommitClusterStateException if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) { || (t instanceof NotMasterException)) {
logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName); logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName);
retry(t, masterNodeChangedPredicate); retry(t, MasterNodeChangePredicate.INSTANCE);
} else { } else {
listener.onFailure(t); listener.onFailure(t);
} }
@ -164,7 +150,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
} else { } else {
if (nodes.masterNode() == null) { if (nodes.masterNode() == null) {
logger.debug("no known master node, scheduling a retry"); logger.debug("no known master node, scheduling a retry");
retry(null, masterNodeChangedPredicate); retry(null, MasterNodeChangePredicate.INSTANCE);
} else { } else {
transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) { transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
@Override @Override
@ -179,7 +165,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
// we want to retry here a bit to see if a new master is elected // we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.masterNode(), exp.getDetailedMessage()); actionName, nodes.masterNode(), exp.getDetailedMessage());
retry(cause, masterNodeChangedPredicate); retry(cause, MasterNodeChangePredicate.INSTANCE);
} else { } else {
listener.onFailure(exp); listener.onFailure(exp);
} }

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
public enum MasterNodeChangePredicate implements ClusterStateObserver.ChangePredicate {
INSTANCE;
@Override
public boolean apply(
ClusterState previousState,
ClusterState.ClusterStateStatus previousStatus,
ClusterState newState,
ClusterState.ClusterStateStatus newStatus) {
// checking if the masterNodeId changed is insufficient as the
// same master node might get re-elected after a disruption
return newState.nodes().masterNodeId() != null && newState != previousState;
}
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
return changedEvent.nodesDelta().masterNodeChanged();
}
}