Simplify Discovery interface (#24608)

- Removes clusterState, getInitialClusterState and getMinimumMasterNodes methods from Discovery interface.
- Sets PingContextProvider in ZenPing constructor
- Renames state in ZenDiscovery to committedState
This commit is contained in:
Yannick Welsch 2017-05-12 14:08:14 +02:00 committed by GitHub
parent b7976bd536
commit 04e08f5e49
20 changed files with 175 additions and 205 deletions

View File

@ -24,8 +24,13 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
import java.util.function.Supplier;
@FunctionalInterface
public interface ClusterApplier {
/**
* Sets the initial state for this applier. Should only be called once.
* @param initialState the initial state to set
*/
void setInitialState(ClusterState initialState);
/**
* Method to invoke when a new cluster state is available to be applied
*

View File

@ -116,6 +116,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
this.nodeConnectionsService = nodeConnectionsService;
}
@Override
public void setInitialState(ClusterState initialState) {
if (lifecycle.started()) {
throw new IllegalStateException("can't set initial state when started");

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
@ -48,18 +47,6 @@ public interface Discovery extends LifecycleComponent {
*/
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
/**
* Returns the initial cluster state provided by the discovery module. Used by
* {@link org.elasticsearch.cluster.service.ClusterApplierService} as initial applied state.
*/
ClusterState getInitialClusterState();
/**
* Returns latest cluster state used by the discovery module. Used by {@link org.elasticsearch.cluster.service.MasterService} to
* calculate the next prospective state to publish.
*/
ClusterState clusterState();
interface AckListener {
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
void onTimeout();
@ -90,9 +77,4 @@ public interface Discovery extends LifecycleComponent {
*/
void startInitialJoin();
/***
* @return the current value of minimum master nodes, or -1 for not set
*/
int getMinimumMasterNodes();
}

View File

@ -86,8 +86,8 @@ public class DiscoveryModule {
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, clusterApplier));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService).entrySet().forEach(entry -> {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
@ -44,26 +45,23 @@ import static org.elasticsearch.tribe.TribeService.TRIBE_WRITE_BLOCK;
public class TribeDiscovery extends SingleNodeDiscovery implements Discovery {
@Inject
public TribeDiscovery(Settings settings, TransportService transportService, ClusterApplier clusterApplier) {
super(settings, transportService, clusterApplier);
public TribeDiscovery(Settings settings, TransportService transportService,
MasterService masterService, ClusterApplier clusterApplier) {
super(settings, transportService, masterService, clusterApplier);
}
@Override
public synchronized ClusterState getInitialClusterState() {
if (initialState == null) {
ClusterBlocks.Builder clusterBlocks = ClusterBlocks.builder(); // don't add no_master / state recovery block
if (BLOCKS_WRITE_SETTING.get(settings)) {
clusterBlocks.addGlobalBlock(TRIBE_WRITE_BLOCK);
}
if (BLOCKS_METADATA_SETTING.get(settings)) {
clusterBlocks.addGlobalBlock(TRIBE_METADATA_BLOCK);
}
DiscoveryNode localNode = transportService.getLocalNode();
initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build())
.blocks(clusterBlocks).build();
protected ClusterState createInitialState(DiscoveryNode localNode) {
ClusterBlocks.Builder clusterBlocks = ClusterBlocks.builder(); // don't add no_master / state recovery block
if (BLOCKS_WRITE_SETTING.get(settings)) {
clusterBlocks.addGlobalBlock(TRIBE_WRITE_BLOCK);
}
return initialState;
if (BLOCKS_METADATA_SETTING.get(settings)) {
clusterBlocks.addGlobalBlock(TRIBE_METADATA_BLOCK);
}
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build())
.blocks(clusterBlocks).build();
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
@ -48,13 +49,13 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
protected final TransportService transportService;
private final ClusterApplier clusterApplier;
protected volatile ClusterState initialState;
private volatile ClusterState clusterState;
public SingleNodeDiscovery(final Settings settings, final TransportService transportService,
ClusterApplier clusterApplier) {
final MasterService masterService, final ClusterApplier clusterApplier) {
super(Objects.requireNonNull(settings));
this.transportService = Objects.requireNonNull(transportService);
masterService.setClusterStateSupplier(() -> clusterState);
this.clusterApplier = clusterApplier;
}
@ -82,7 +83,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
e);
}
};
clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", this::clusterState, listener);
clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, listener);
try {
latch.await();
@ -91,27 +92,6 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
}
}
@Override
public synchronized ClusterState getInitialClusterState() {
if (initialState == null) {
DiscoveryNode localNode = transportService.getLocalNode();
initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
.build())
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
}
return initialState;
}
@Override
public ClusterState clusterState() {
return clusterState;
}
@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
@ -119,21 +99,32 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
@Override
public synchronized void startInitialJoin() {
if (lifecycle.started() == false) {
throw new IllegalStateException("can't start initial join when not started");
}
// apply a fresh cluster state just so that state recovery gets triggered by GatewayService
// TODO: give discovery module control over GatewayService
clusterState = ClusterState.builder(getInitialClusterState()).build();
clusterApplier.onNewClusterState("single-node-start-initial-join", this::clusterState, (source, e) -> {});
}
@Override
public int getMinimumMasterNodes() {
return 1;
clusterState = ClusterState.builder(clusterState).build();
clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> {});
}
@Override
protected synchronized void doStart() {
initialState = getInitialClusterState();
clusterState = initialState;
// set initial state
DiscoveryNode localNode = transportService.getLocalNode();
clusterState = createInitialState(localNode);
clusterApplier.setInitialState(clusterState);
}
protected ClusterState createInitialState(DiscoveryNode localNode) {
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
.build())
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
}
@Override

View File

@ -116,7 +116,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final int limitPortCounts;
private volatile PingContextProvider contextProvider;
private final PingContextProvider contextProvider;
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
@ -137,12 +137,13 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private volatile boolean closed = false;
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider unicastHostsProvider) {
UnicastHostsProvider unicastHostsProvider, PingContextProvider contextProvider) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.hostsProvider = unicastHostsProvider;
this.contextProvider = contextProvider;
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
@ -260,8 +261,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
@Override
public void start(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
public void start() {
}
/**

View File

@ -143,9 +143,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final ClusterApplier clusterApplier;
private final AtomicReference<ClusterState> state; // last committed cluster state
private final AtomicReference<ClusterState> committedState; // last committed cluster state
private final Object stateMutex = new Object();
private volatile ClusterState initialState; // set lazily when discovery layer is started
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
@ -165,7 +164,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
this.threadPool = threadPool;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.state = new AtomicReference<>();
this.committedState = new AtomicReference<>();
this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
@ -214,6 +213,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
masterService.setClusterStateSupplier(this::clusterState);
transportService.registerRequestHandler(
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}
@ -221,7 +222,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// protected to allow overriding in tests
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider, this);
}
@Override
@ -229,12 +230,21 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
synchronized (stateMutex) {
initialState = getInitialClusterState();
state.set(initialState);
// set initial state
assert committedState.get() == null;
assert localNode != null;
ClusterState initialState = ClusterState.builder(clusterName)
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(discoverySettings.getNoMasterBlock()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))
.build();
committedState.set(initialState);
clusterApplier.setInitialState(initialState);
nodesFD.setLocalNode(localNode);
joinThreadControl.start();
}
zenPing.start(this);
zenPing.start();
}
@Override
@ -286,7 +296,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
public ClusterState clusterState() {
ClusterState clusterState = state.get();
ClusterState clusterState = committedState.get();
assert clusterState != null : "accessing cluster state before it is set";
return clusterState;
}
@ -297,7 +307,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();
// state got changed locally (maybe because another master published to us)
if (clusterChangedEvent.previousState() != this.state.get()) {
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
}
@ -345,7 +355,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
});
synchronized (stateMutex) {
if (clusterChangedEvent.previousState() != this.state.get()) {
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes");
}
@ -371,22 +381,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
}
@Override
public synchronized ClusterState getInitialClusterState() {
if (initialState == null) {
assert state.get() == null;
DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
initialState = ClusterState.builder(clusterName)
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(discoverySettings.getNoMasterBlock()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))
.build();
}
return initialState;
}
/**
* Gets the current set of nodes involved in the node fault detection.
* NB: for testing purposes
@ -405,11 +399,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
return discoverySettings;
}
@Override
public int getMinimumMasterNodes() {
return electMaster.minimumMasterNodes();
}
/**
* returns true if zen discovery is started and there is a currently a background thread active for (re)joining
* the cluster used for testing.
@ -548,9 +537,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
// visible for testing
void setState(ClusterState clusterState) {
void setCommittedState(ClusterState clusterState) {
synchronized (stateMutex) {
state.set(clusterState);
committedState.set(clusterState);
}
}
@ -693,7 +682,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
synchronized (stateMutex) {
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(state.get().nodes())) {
if (!electMaster.hasEnoughMasterNodes(committedState.get().nodes())) {
rejoin("not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]");
}
}
@ -712,7 +701,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
logger.info((Supplier<?>) () -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
synchronized (stateMutex) {
if (localNodeMaster() == false && masterNode.equals(state.get().nodes().getMasterNode())) {
if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
// flush any pending cluster states from old master, so it will not be set as master again
publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
rejoin("master left (reason = " + reason + ")");
@ -725,7 +714,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
assert Thread.holdsLock(stateMutex);
final ClusterState newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess();
final ClusterState currentState = state.get();
final ClusterState currentState = committedState.get();
final ClusterState adaptedNewClusterState;
// all pending states have been processed
if (newClusterState == null) {
@ -801,7 +790,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
return false;
}
state.set(adaptedNewClusterState);
committedState.set(adaptedNewClusterState);
// update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
// and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
@ -997,7 +986,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
protected void rejoin(String reason) {
assert Thread.holdsLock(stateMutex);
ClusterState clusterState = state.get();
ClusterState clusterState = committedState.get();
logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
nodesFD.stop();
@ -1021,7 +1010,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
.nodes(discoveryNodes)
.build();
state.set(clusterState);
committedState.set(clusterState);
clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
}
}
@ -1123,7 +1112,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
synchronized (stateMutex) {
ClusterState currentState = state.get();
ClusterState currentState = committedState.get();
if (currentState.nodes().isLocalNodeElectedMaster()) {
pingsWhileMaster.set(0);
handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");

View File

@ -40,7 +40,7 @@ import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK
public interface ZenPing extends Releasable {
void start(PingContextProvider contextProvider);
void start();
void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);

View File

@ -32,13 +32,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Supplier;
public class Gateway extends AbstractComponent implements ClusterStateApplier {
@ -48,18 +47,18 @@ public class Gateway extends AbstractComponent implements ClusterStateApplier {
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final Supplier<Integer> minimumMasterNodesProvider;
private final int minimumMasterNodes;
private final IndicesService indicesService;
public Gateway(Settings settings, ClusterService clusterService, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.metaState = metaState;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodesProvider = discovery::getMinimumMasterNodes;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
clusterService.addLowPriorityApplier(this);
}
@ -69,7 +68,7 @@ public class Gateway extends AbstractComponent implements ClusterStateApplier {
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
int requiredAllocation = Math.max(1, minimumMasterNodes);
if (nodesState.hasFailures()) {

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@ -93,10 +92,10 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Inject
public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService,
ThreadPool threadPool, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) {
super(settings);
this.gateway = new Gateway(settings, clusterService, metaState, listGatewayMetaState, discovery,
this.gateway = new Gateway(settings, clusterService, metaState, listGatewayMetaState,
indicesService);
this.allocationService = allocationService;
this.clusterService = clusterService;
@ -227,10 +226,6 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
}
}
public Gateway getGateway() {
return gateway;
}
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {
@Override

View File

@ -684,11 +684,9 @@ public class Node implements Closeable {
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
clusterService.addStateApplier(transportService.getTaskManager());
clusterService.getMasterService().setClusterStateSupplier(discovery::clusterState);
clusterService.getClusterApplierService().setInitialState(discovery.getInitialClusterState());
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();
discovery.start();
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();

View File

@ -85,14 +85,6 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
final UnicastHostsProvider provider =
() -> Collections.singletonList(nodeTransport.getLocalNode());
final CountDownLatch latch = new CountDownLatch(1);
final UnicastZenPing unicastZenPing =
new UnicastZenPing(settings, threadPool, pingTransport, provider) {
@Override
protected void finishPingingRound(PingingRound pingingRound) {
latch.countDown();
super.finishPingingRound(pingingRound);
}
};
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
@ -100,7 +92,15 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
.build();
final ClusterName clusterName = new ClusterName(internalCluster().getClusterName());
final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build();
unicastZenPing.start(() -> state);
final UnicastZenPing unicastZenPing =
new UnicastZenPing(settings, threadPool, pingTransport, provider, () -> state) {
@Override
protected void finishPingingRound(PingingRound pingingRound) {
latch.countDown();
super.finishPingingRound(pingingRound);
}
};
unicastZenPing.start();
closeables.push(unicastZenPing);
final CompletableFuture<ZenPing.PingCollection> responses = new CompletableFuture<>();
unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3));

View File

@ -21,9 +21,12 @@ package org.elasticsearch.discovery.single;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
@ -32,8 +35,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.createMasterService;
import static org.hamcrest.Matchers.equalTo;
public class SingleNodeDiscoveryTests extends ESTestCase {
@ -49,13 +54,26 @@ public class SingleNodeDiscoveryTests extends ESTestCase {
stack.push(transportService);
transportService.start();
final DiscoveryNode node = transportService.getLocalNode();
final ClusterService clusterService = createClusterService(threadPool, node);
stack.push(clusterService);
final MasterService masterService = createMasterService(threadPool, node);
AtomicReference<ClusterState> clusterState = new AtomicReference<>();
final SingleNodeDiscovery discovery =
new SingleNodeDiscovery(Settings.EMPTY, transportService,
clusterService.getClusterApplierService());
masterService, new ClusterApplier() {
@Override
public void setInitialState(ClusterState initialState) {
clusterState.set(initialState);
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
ClusterStateTaskListener listener) {
clusterState.set(clusterStateSupplier.get());
listener.clusterStateProcessed(source, clusterState.get(), clusterState.get());
}
});
discovery.start();
discovery.startInitialJoin();
final DiscoveryNodes nodes = discovery.getInitialClusterState().nodes();
final DiscoveryNodes nodes = clusterState.get().nodes();
assertThat(nodes.getSize(), equalTo(1));
assertThat(nodes.getMasterNode().getId(), equalTo(node.getId()));
} finally {

View File

@ -188,40 +188,40 @@ public class UnicastZenPingTests extends ESTestCase {
.build();
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
ClusterState stateC = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC,
EMPTY_HOSTS_PROVIDER) {
EMPTY_HOSTS_PROVIDER, () -> stateC) {
@Override
protected Version getVersion() {
return versionD;
}
};
ClusterState stateC = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
zenPingC.start(() -> stateC);
zenPingC.start();
closeables.push(zenPingC);
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD,
EMPTY_HOSTS_PROVIDER);
ClusterState stateD = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D"))
.build();
zenPingD.start(() -> stateD);
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD,
EMPTY_HOSTS_PROVIDER, () -> stateD);
zenPingD.start();
closeables.push(zenPingD);
logger.info("ping from UZP_A");
@ -311,26 +311,26 @@ public class UnicastZenPingTests extends ESTestCase {
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER);
ClusterState stateC = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
zenPingC.start(() -> stateC);
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER, () -> stateC);
zenPingC.start();
closeables.push(zenPingC);
// the presence of an unresolvable host should not prevent resolvable hosts from being pinged
@ -609,19 +609,19 @@ public class UnicastZenPingTests extends ESTestCase {
}
});
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
final ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A"))
.build();
zenPingA.start(() -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
final ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
zenPingB.start(() -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
@ -660,15 +660,15 @@ public class UnicastZenPingTests extends ESTestCase {
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
zenPingA.start(() -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
// Node B doesn't know about A!
final ClusterState stateB = ClusterState.builder(state).nodes(
DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
zenPingB.start(() -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
{
@ -796,9 +796,9 @@ public class UnicastZenPingTests extends ESTestCase {
private static class TestUnicastZenPing extends UnicastZenPing {
TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle,
UnicastHostsProvider unicastHostsProvider) {
UnicastHostsProvider unicastHostsProvider, PingContextProvider contextProvider) {
super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(),
threadPool, networkHandle.transportService, unicastHostsProvider);
threadPool, networkHandle.transportService, unicastHostsProvider, contextProvider);
}
volatile CountDownLatch allTasksCompleted;

View File

@ -26,6 +26,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -37,6 +39,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
@ -67,7 +70,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -195,7 +198,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
ZenDiscovery masterZen = buildZenDiscovery(
settingsWithClusterName,
masterTransport, masterMasterService, threadPool);
masterZen.setState(state);
masterZen.setCommittedState(state);
toClose.addFirst(masterZen);
masterTransport.acceptIncomingRequests();
@ -209,7 +212,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
MasterService otherMasterService = ClusterServiceUtils.createMasterService(threadPool, otherNode);
toClose.addFirst(otherMasterService);
ZenDiscovery otherZen = buildZenDiscovery(settingsWithClusterName, otherTransport, otherMasterService, threadPool);
otherZen.setState(otherState);
otherZen.setCommittedState(otherState);
toClose.addFirst(otherZen);
otherTransport.acceptIncomingRequests();
@ -262,7 +265,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
toClose.addFirst(masterMasterService);
state = ClusterState.builder(discoveryState(masterMasterService).getClusterName()).nodes(state.nodes()).build();
ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterMasterService, threadPool);
masterZen.setState(state);
masterZen.setCommittedState(state);
toClose.addFirst(masterZen);
masterTransport.acceptIncomingRequests();
@ -297,9 +300,19 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, MasterService masterService,
ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterApplier clusterApplier = new ClusterApplier() {
@Override
public void setInitialState(ClusterState initialState) {
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, (source, clusterStateSupplier, listener) -> listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get()),
clusterSettings, Collections::emptyList, null);
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService());
zenDiscovery.start();
return zenDiscovery;
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.NoopDiscovery;
import org.hamcrest.Matchers;
import java.io.IOException;
@ -36,7 +35,7 @@ public class GatewayServiceTests extends ESTestCase {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null);
return new GatewayService(settings.build(),
null, clusterService, null, null, null, new NoopDiscovery(), null);
null, clusterService, null, null, null, null);
}
public void testDefaultRecoverAfterTime() throws IOException {

View File

@ -19,8 +19,6 @@
package org.elasticsearch.test;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.discovery.Discovery;
@ -33,16 +31,6 @@ public class NoopDiscovery implements Discovery {
}
@Override
public ClusterState getInitialClusterState() {
return null;
}
@Override
public ClusterState clusterState() {
return null;
}
@Override
public DiscoveryStats stats() {
return null;
@ -53,11 +41,6 @@ public class NoopDiscovery implements Discovery {
}
@Override
public int getMinimumMasterNodes() {
return -1;
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -43,16 +43,15 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
/** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */
private Set<MockZenPing> lastDiscoveredPings = null;
private volatile PingContextProvider contextProvider;
private final PingContextProvider contextProvider;
public MockZenPing(Settings settings) {
public MockZenPing(Settings settings, PingContextProvider contextProvider) {
super(settings);
this.contextProvider = contextProvider;
}
@Override
public void start(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
assert contextProvider != null;
public void start() {
synchronized (activeNodesPerCluster) {
boolean added = getActiveNodesForCurrentCluster().add(this);
assert added;

View File

@ -90,7 +90,7 @@ public class TestZenDiscovery extends ZenDiscovery {
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
if (USE_MOCK_PINGS.get(settings)) {
return new MockZenPing(settings);
return new MockZenPing(settings, this);
} else {
return super.newZenPing(settings, threadPool, transportService, hostsProvider);
}