Migrate ZenDiscoveryIT to Zen2 (#37465)
ZenDiscoveryIT contained 5 tests. 3 run without changes, testNodeRejectsClusterStateWithWrongMasterNode removed, testHandleNodeJoin_incompatibleClusterState changed.
This commit is contained in:
parent
170d7413d0
commit
9e7fd8caed
|
@ -397,6 +397,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
|
||||
assert Thread.holdsLock(mutex) == false;
|
||||
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
|
||||
|
@ -413,30 +414,37 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
|
||||
stateForJoinValidation.getNodes().getMinNodeVersion());
|
||||
}
|
||||
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
|
||||
|
||||
// validate the join on the joining node, will throw a failure if it fails the validation
|
||||
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
try {
|
||||
processJoinRequest(joinRequest, joinCallback);
|
||||
} catch (Exception e) {
|
||||
joinCallback.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]",
|
||||
joinRequest.getSourceNode()), e);
|
||||
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
processJoinRequest(joinRequest, joinCallback);
|
||||
}
|
||||
}
|
||||
|
||||
// package private for tests
|
||||
void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest,
|
||||
JoinHelper.JoinCallback joinCallback) {
|
||||
// validate the join on the joining node, will throw a failure if it fails the validation
|
||||
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
|
||||
@Override
|
||||
public void onResponse(Empty empty) {
|
||||
try {
|
||||
processJoinRequest(joinRequest, joinCallback);
|
||||
} catch (Exception e) {
|
||||
joinCallback.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]",
|
||||
joinRequest.getSourceNode()), e);
|
||||
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
|
||||
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
|
||||
synchronized (mutex) {
|
||||
|
|
|
@ -17,9 +17,8 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
package org.elasticsearch.cluster.coordination;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
|
@ -27,41 +26,32 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryStats;
|
||||
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.TestCustomMetaData;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BytesTransportRequest;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -73,13 +63,6 @@ import static org.hamcrest.Matchers.notNullValue;
|
|||
@TestLogging("_root:DEBUG")
|
||||
public class ZenDiscoveryIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // Zen1-specific stuff in some tests
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception {
|
||||
Settings defaultSettings = Settings.builder()
|
||||
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s")
|
||||
|
@ -122,7 +105,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster));
|
||||
}
|
||||
|
||||
public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException {
|
||||
public void testNodeFailuresAreProcessedOnce() throws IOException {
|
||||
Settings defaultSettings = Settings.builder()
|
||||
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s")
|
||||
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1")
|
||||
|
@ -161,78 +144,39 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
assertThat(numUpdates.get(), either(equalTo(1)).or(equalTo(2))); // due to batching, both nodes can be handled in same CS update
|
||||
}
|
||||
|
||||
public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception {
|
||||
List<String> nodeNames = internalCluster().startNodes(2);
|
||||
|
||||
List<String> nonMasterNodes = new ArrayList<>(nodeNames);
|
||||
nonMasterNodes.remove(internalCluster().getMasterName());
|
||||
String noneMasterNode = nonMasterNodes.get(0);
|
||||
|
||||
ClusterState state = internalCluster().getInstance(ClusterService.class).state();
|
||||
DiscoveryNode node = null;
|
||||
for (DiscoveryNode discoveryNode : state.nodes()) {
|
||||
if (discoveryNode.getName().equals(noneMasterNode)) {
|
||||
node = discoveryNode;
|
||||
}
|
||||
}
|
||||
assert node != null;
|
||||
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(state.nodes())
|
||||
.add(new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(),
|
||||
emptySet(), Version.CURRENT)).masterNodeId("abc");
|
||||
ClusterState.Builder builder = ClusterState.builder(state);
|
||||
builder.nodes(nodes);
|
||||
BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.getVersion());
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Exception> reference = new AtomicReference<>();
|
||||
internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME,
|
||||
new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
super.handleResponse(response);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
super.handleException(exp);
|
||||
reference.set(exp);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
latch.await();
|
||||
assertThat(reference.get(), notNullValue());
|
||||
assertThat(ExceptionsHelper.detailedMessage(reference.get()),
|
||||
containsString("cluster state from a different master than the current one, rejecting"));
|
||||
}
|
||||
|
||||
public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException {
|
||||
String masterOnlyNode = internalCluster().startMasterOnlyNode();
|
||||
public void testHandleNodeJoin_incompatibleClusterState()
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
String masterNode = internalCluster().startMasterOnlyNode();
|
||||
String node1 = internalCluster().startNode();
|
||||
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode);
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
|
||||
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, masterNode);
|
||||
final ClusterState state = clusterService.state();
|
||||
MetaData.Builder mdBuilder = MetaData.builder(state.metaData());
|
||||
mdBuilder.putCustom(CustomMetaData.TYPE, new CustomMetaData("data"));
|
||||
ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build();
|
||||
|
||||
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
|
||||
final CompletableFuture<Throwable> future = new CompletableFuture<>();
|
||||
DiscoveryNode node = state.nodes().getLocalNode();
|
||||
zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() {
|
||||
|
||||
coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()),
|
||||
new JoinHelper.JoinCallback() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
future.completeExceptionally(new AssertionError("onSuccess should not be called"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
holder.set((IllegalStateException) e);
|
||||
future.complete(e);
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(holder.get(), notNullValue());
|
||||
assertThat(holder.get().getMessage(), equalTo("failure when sending a validation request to node"));
|
||||
Throwable t = future.get(10, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(t instanceof IllegalStateException);
|
||||
assertTrue(t.getCause() instanceof RemoteTransportException);
|
||||
assertTrue(t.getCause().getCause() instanceof IllegalArgumentException);
|
||||
assertThat(t.getCause().getCause().getMessage(), containsString("Unknown NamedWriteable"));
|
||||
}
|
||||
|
||||
public static class CustomMetaData extends TestCustomMetaData {
|
Loading…
Reference in New Issue