mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Merge branch '7.x' of github.com:elastic/elasticsearch into 7.x
This commit is contained in:
commit
5aeb736801
@ -75,20 +75,22 @@ nodes. For example:
|
|||||||
cluster:
|
cluster:
|
||||||
remote:
|
remote:
|
||||||
cluster_one: <1>
|
cluster_one: <1>
|
||||||
seeds: 127.0.0.1:9300
|
seeds: 127.0.0.1:9300 <2>
|
||||||
transport.ping_schedule: 30s <2>
|
transport.ping_schedule: 30s <3>
|
||||||
cluster_two:
|
cluster_two:
|
||||||
seeds: 127.0.0.1:9301
|
seeds: 127.0.0.1:9301
|
||||||
transport.compress: true <3>
|
transport.compress: true <4>
|
||||||
skip_unavailable: true <4>
|
skip_unavailable: true <5>
|
||||||
|
|
||||||
--------------------------------
|
--------------------------------
|
||||||
<1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing
|
<1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing
|
||||||
the connection to each cluster. These names are subsequently used to distinguish
|
the connection to each cluster. These names are subsequently used to distinguish
|
||||||
between local and remote indices.
|
between local and remote indices.
|
||||||
<2> A keep-alive ping is configured for `cluster_one`.
|
<2> The hostname and <<modules-transport,transport>> port (default: 9300) of a
|
||||||
<3> Compression is explicitly enabled for requests to `cluster_two`.
|
seed node in the remote cluster.
|
||||||
<4> Disconnected remote clusters are optional for `cluster_two`.
|
<3> A keep-alive ping is configured for `cluster_one`.
|
||||||
|
<4> Compression is explicitly enabled for requests to `cluster_two`.
|
||||||
|
<5> Disconnected remote clusters are optional for `cluster_two`.
|
||||||
|
|
||||||
For more information about the optional transport settings, see
|
For more information about the optional transport settings, see
|
||||||
<<modules-transport>>.
|
<<modules-transport>>.
|
||||||
|
@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||||
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
||||||
@ -75,6 +74,13 @@ public class PublicationTransportHandler {
|
|||||||
|
|
||||||
private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
|
private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
|
||||||
|
|
||||||
|
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
|
||||||
|
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
|
||||||
|
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
|
||||||
|
// snapshot code).
|
||||||
|
// TODO: look into these and check how to get rid of them
|
||||||
|
private AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference<>();
|
||||||
|
|
||||||
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
|
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
|
||||||
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
|
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
|
||||||
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
|
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
|
||||||
@ -179,32 +185,32 @@ public class PublicationTransportHandler {
|
|||||||
return new PublicationContext() {
|
return new PublicationContext() {
|
||||||
@Override
|
@Override
|
||||||
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
|
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
|
||||||
ActionListener<PublishWithJoinResponse> responseActionListener) {
|
ActionListener<PublishWithJoinResponse> originalListener) {
|
||||||
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
|
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
|
||||||
|
final ActionListener<PublishWithJoinResponse> responseActionListener;
|
||||||
if (destination.equals(nodes.getLocalNode())) {
|
if (destination.equals(nodes.getLocalNode())) {
|
||||||
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
|
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
|
||||||
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
|
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
|
||||||
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
|
assert previousRequest == null;
|
||||||
// snapshot code).
|
responseActionListener = new ActionListener<PublishWithJoinResponse>() {
|
||||||
// TODO: look into these and check how to get rid of them
|
@Override
|
||||||
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
|
public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
|
||||||
|
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
|
||||||
|
assert previousRequest == publishRequest;
|
||||||
|
originalListener.onResponse(publishWithJoinResponse);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
// wrap into fake TransportException, as that's what we expect in Publication
|
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
|
||||||
responseActionListener.onFailure(new TransportException(e));
|
assert previousRequest == publishRequest;
|
||||||
|
originalListener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
};
|
||||||
@Override
|
} else {
|
||||||
protected void doRun() {
|
responseActionListener = originalListener;
|
||||||
responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
|
}
|
||||||
}
|
if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "publish to self of " + publishRequest;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
|
|
||||||
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
|
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
|
||||||
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
|
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
|
||||||
} else {
|
} else {
|
||||||
@ -314,10 +320,6 @@ public class PublicationTransportHandler {
|
|||||||
Map<Version, BytesReference> serializedDiffs) {
|
Map<Version, BytesReference> serializedDiffs) {
|
||||||
Diff<ClusterState> diff = null;
|
Diff<ClusterState> diff = null;
|
||||||
for (DiscoveryNode node : discoveryNodes) {
|
for (DiscoveryNode node : discoveryNodes) {
|
||||||
if (node.equals(discoveryNodes.getLocalNode())) {
|
|
||||||
// ignore, see newPublicationContext
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
|
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
|
||||||
if (serializedStates.containsKey(node.getVersion()) == false) {
|
if (serializedStates.containsKey(node.getVersion()) == false) {
|
||||||
@ -403,7 +405,7 @@ public class PublicationTransportHandler {
|
|||||||
fullClusterStateReceivedCount.incrementAndGet();
|
fullClusterStateReceivedCount.incrementAndGet();
|
||||||
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
|
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
|
||||||
request.bytes().length());
|
request.bytes().length());
|
||||||
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
|
final PublishWithJoinResponse response = acceptState(incomingState);
|
||||||
lastSeenClusterState.set(incomingState);
|
lastSeenClusterState.set(incomingState);
|
||||||
return response;
|
return response;
|
||||||
} else {
|
} else {
|
||||||
@ -413,7 +415,7 @@ public class PublicationTransportHandler {
|
|||||||
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
|
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
|
||||||
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
||||||
} else {
|
} else {
|
||||||
final ClusterState incomingState;
|
ClusterState incomingState;
|
||||||
try {
|
try {
|
||||||
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
|
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
|
||||||
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
|
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
|
||||||
@ -427,7 +429,7 @@ public class PublicationTransportHandler {
|
|||||||
compatibleClusterStateDiffReceivedCount.incrementAndGet();
|
compatibleClusterStateDiffReceivedCount.incrementAndGet();
|
||||||
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
||||||
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
||||||
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
|
final PublishWithJoinResponse response = acceptState(incomingState);
|
||||||
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
|
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@ -436,4 +438,17 @@ public class PublicationTransportHandler {
|
|||||||
IOUtils.close(in);
|
IOUtils.close(in);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private PublishWithJoinResponse acceptState(ClusterState incomingState) {
|
||||||
|
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
|
||||||
|
if (transportService.getLocalNode().equals(incomingState.nodes().getMasterNode())) {
|
||||||
|
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
|
||||||
|
if (publishRequest == null || publishRequest.getAcceptedState().stateUUID().equals(incomingState.stateUUID()) == false) {
|
||||||
|
throw new IllegalStateException("publication to self failed for " + publishRequest);
|
||||||
|
} else {
|
||||||
|
return handlePublishRequest.apply(publishRequest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return handlePublishRequest.apply(new PublishRequest(incomingState));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Request{" +
|
return "RetentionLeaseBackgroundSyncAction.Request{" +
|
||||||
"retentionLeases=" + retentionLeases +
|
"retentionLeases=" + retentionLeases +
|
||||||
", shardId=" + shardId +
|
", shardId=" + shardId +
|
||||||
", timeout=" + timeout +
|
", timeout=" + timeout +
|
||||||
|
@ -182,7 +182,7 @@ public class RetentionLeaseSyncAction extends
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Request{" +
|
return "RetentionLeaseSyncAction.Request{" +
|
||||||
"retentionLeases=" + retentionLeases +
|
"retentionLeases=" + retentionLeases +
|
||||||
", shardId=" + shardId +
|
", shardId=" + shardId +
|
||||||
", timeout=" + timeout +
|
", timeout=" + timeout +
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
|
|
||||||
package org.elasticsearch.cluster;
|
package org.elasticsearch.cluster;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
@ -36,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
@ -57,8 +60,10 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
@ -215,6 +220,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||||||
assertConnectedExactlyToNodes(targetNodes);
|
assertConnectedExactlyToNodes(targetNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@TestLogging("org.elasticsearch.cluster.NodeConnectionsService:TRACE") // for https://github.com/elastic/elasticsearch/issues/40170
|
||||||
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
|
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
|
||||||
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
|
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
|
||||||
|
|
||||||
@ -227,7 +233,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||||||
assertConnectedExactlyToNodes(nodes0);
|
assertConnectedExactlyToNodes(nodes0);
|
||||||
|
|
||||||
// connection attempts to node0 block indefinitely
|
// connection attempts to node0 block indefinitely
|
||||||
final CyclicBarrier connectionBarrier = new CyclicBarrier(2);
|
final CyclicBarrier connectionBarrier = new VerboseCyclicBarrier(2);
|
||||||
try {
|
try {
|
||||||
nodeConnectionBlocks.put(node0, connectionBarrier::await);
|
nodeConnectionBlocks.put(node0, connectionBarrier::await);
|
||||||
transportService.disconnectFromNode(node0);
|
transportService.disconnectFromNode(node0);
|
||||||
@ -297,6 +303,33 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tracing barrier usage for https://github.com/elastic/elasticsearch/issues/40170
|
||||||
|
private class VerboseCyclicBarrier extends CyclicBarrier {
|
||||||
|
VerboseCyclicBarrier(int parties) {
|
||||||
|
super(parties);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int await() throws InterruptedException, BrokenBarrierException {
|
||||||
|
final String waitUUID = UUIDs.randomBase64UUID(random());
|
||||||
|
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
|
||||||
|
new ElasticsearchException("stack trace for CyclicBarrier#await()"));
|
||||||
|
final int result = super.await();
|
||||||
|
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
|
||||||
|
final String waitUUID = UUIDs.randomBase64UUID(random());
|
||||||
|
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
|
||||||
|
new ElasticsearchException("stack trace for CyclicBarrier#await(" + timeout + ", " + unit + ')'));
|
||||||
|
final int result = super.await(timeout, unit);
|
||||||
|
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
|
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
|
||||||
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
|
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
|
||||||
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
|
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
|
||||||
|
@ -771,23 +771,12 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||||||
|
|
||||||
for (ClusterNode cn : cluster.clusterNodes) {
|
for (ClusterNode cn : cluster.clusterNodes) {
|
||||||
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
|
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
|
||||||
if (cn == leader) {
|
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
||||||
// leader does not update publish stats as it's not using the serialized state
|
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
|
||||||
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
|
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
||||||
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
|
||||||
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
|
||||||
} else {
|
|
||||||
// followers receive a diff
|
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
|
|
||||||
postPublishStats.get(cn).getFullClusterStateReceivedCount());
|
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
|
|
||||||
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
|
|
||||||
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
|
|
||||||
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
@ -50,6 +49,7 @@ import java.util.concurrent.TimeoutException;
|
|||||||
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
@ -158,21 +158,6 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testDiscoveryStats() throws Exception {
|
public void testDiscoveryStats() throws Exception {
|
||||||
String expectedStatsJsonResponse = "{\n" +
|
|
||||||
" \"discovery\" : {\n" +
|
|
||||||
" \"cluster_state_queue\" : {\n" +
|
|
||||||
" \"total\" : 0,\n" +
|
|
||||||
" \"pending\" : 0,\n" +
|
|
||||||
" \"committed\" : 0\n" +
|
|
||||||
" },\n" +
|
|
||||||
" \"published_cluster_states\" : {\n" +
|
|
||||||
" \"full_states\" : 0,\n" +
|
|
||||||
" \"incompatible_diffs\" : 0,\n" +
|
|
||||||
" \"compatible_diffs\" : 0\n" +
|
|
||||||
" }\n" +
|
|
||||||
" }\n" +
|
|
||||||
"}";
|
|
||||||
|
|
||||||
internalCluster().startNode();
|
internalCluster().startNode();
|
||||||
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
|
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
|
||||||
assertBusy(() ->
|
assertBusy(() ->
|
||||||
@ -190,15 +175,13 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||||||
assertThat(stats.getQueueStats().getPending(), equalTo(0));
|
assertThat(stats.getQueueStats().getPending(), equalTo(0));
|
||||||
|
|
||||||
assertThat(stats.getPublishStats(), notNullValue());
|
assertThat(stats.getPublishStats(), notNullValue());
|
||||||
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
|
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), greaterThanOrEqualTo(0L));
|
||||||
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
|
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
|
||||||
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));
|
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), greaterThanOrEqualTo(0L));
|
||||||
|
|
||||||
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
|
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
|
|
||||||
assertThat(Strings.toString(builder), equalTo(expectedStatsJsonResponse));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -583,7 +583,6 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||||||
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
|
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41067")
|
|
||||||
public void testCollectNodes() throws InterruptedException, IOException {
|
public void testCollectNodes() throws InterruptedException, IOException {
|
||||||
final Settings settings = Settings.EMPTY;
|
final Settings settings = Settings.EMPTY;
|
||||||
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();
|
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();
|
||||||
|
@ -201,7 +201,15 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||||||
portGenerator.set(0);
|
portGenerator.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Allows distinguishing between parallel test processes
|
||||||
|
public static final int TEST_WORKER_VM;
|
||||||
|
|
||||||
|
protected static final String TEST_WORKER_SYS_PROPERTY = "org.gradle.test.worker";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
// org.gradle.test.worker starts counting at 1, but we want to start counting at 0 here
|
||||||
|
// in case system property is not defined (e.g. when running test from IDE), just use 0
|
||||||
|
TEST_WORKER_VM = RandomizedTest.systemPropertyAsInt(TEST_WORKER_SYS_PROPERTY, 1) - 1;
|
||||||
setTestSysProps();
|
setTestSysProps();
|
||||||
LogConfigurator.loadLog4jPlugins();
|
LogConfigurator.loadLog4jPlugins();
|
||||||
|
|
||||||
|
@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
|||||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
||||||
import com.carrotsearch.randomizedtesting.SeedUtils;
|
import com.carrotsearch.randomizedtesting.SeedUtils;
|
||||||
import com.carrotsearch.randomizedtesting.SysGlobals;
|
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||||
@ -526,8 +525,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||||||
|
|
||||||
public static String clusterName(String prefix, long clusterSeed) {
|
public static String clusterName(String prefix, long clusterSeed) {
|
||||||
StringBuilder builder = new StringBuilder(prefix);
|
StringBuilder builder = new StringBuilder(prefix);
|
||||||
final int childVM = RandomizedTest.systemPropertyAsInt(SysGlobals.CHILDVM_SYSPROP_JVM_ID, 0);
|
builder.append("-TEST_WORKER_VM=[").append(ESTestCase.TEST_WORKER_VM).append(']');
|
||||||
builder.append("-CHILD_VM=[").append(childVM).append(']');
|
|
||||||
builder.append("-CLUSTER_SEED=[").append(clusterSeed).append(']');
|
builder.append("-CLUSTER_SEED=[").append(clusterSeed).append(']');
|
||||||
// if multiple maven task run on a single host we better have an identifier that doesn't rely on input params
|
// if multiple maven task run on a single host we better have an identifier that doesn't rely on input params
|
||||||
builder.append("-HASH=[").append(SeedUtils.formatSeed(System.nanoTime())).append(']');
|
builder.append("-HASH=[").append(SeedUtils.formatSeed(System.nanoTime())).append(']');
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
package org.elasticsearch.test.transport;
|
package org.elasticsearch.test.transport;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.SysGlobals;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
@ -46,6 +45,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
@ -92,7 +92,6 @@ public final class MockTransportService extends TransportService {
|
|||||||
private static final Logger logger = LogManager.getLogger(MockTransportService.class);
|
private static final Logger logger = LogManager.getLogger(MockTransportService.class);
|
||||||
|
|
||||||
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
|
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
|
||||||
private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0"));
|
|
||||||
|
|
||||||
public static class TestPlugin extends Plugin {
|
public static class TestPlugin extends Plugin {
|
||||||
@Override
|
@Override
|
||||||
@ -112,7 +111,8 @@ public final class MockTransportService extends TransportService {
|
|||||||
// concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might
|
// concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might
|
||||||
// be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use
|
// be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use
|
||||||
// a different default port range per JVM unless the incoming settings override it
|
// a different default port range per JVM unless the incoming settings override it
|
||||||
int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port
|
// use a non-default base port otherwise some cluster in this JVM might reuse a port
|
||||||
|
int basePort = 10300 + (ESTestCase.TEST_WORKER_VM * 100);
|
||||||
settings = Settings.builder().put(TransportSettings.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
|
settings = Settings.builder().put(TransportSettings.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
||||||
return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
|
return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
package org.elasticsearch.test.test;
|
package org.elasticsearch.test.test;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
||||||
import junit.framework.AssertionFailedError;
|
import junit.framework.AssertionFailedError;
|
||||||
|
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
@ -181,4 +182,11 @@ public class ESTestCaseTests extends ESTestCase {
|
|||||||
Supplier<Object> usuallyNull = () -> usually() ? null : randomInt();
|
Supplier<Object> usuallyNull = () -> usually() ? null : randomInt();
|
||||||
assertNotNull(randomValueOtherThan(null, usuallyNull));
|
assertNotNull(randomValueOtherThan(null, usuallyNull));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testWorkerSystemProperty() {
|
||||||
|
assumeTrue("requires running tests with Gradle", System.getProperty("tests.gradle") != null);
|
||||||
|
// org.gradle.test.worker starts counting at 1
|
||||||
|
assertThat(RandomizedTest.systemPropertyAsInt(TEST_WORKER_SYS_PROPERTY, -1), greaterThan(0));
|
||||||
|
assertEquals(RandomizedTest.systemPropertyAsInt(TEST_WORKER_SYS_PROPERTY, -1) - 1, TEST_WORKER_VM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ package org.elasticsearch.cluster.coordination;
|
|||||||
|
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
@ -80,6 +81,17 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
|
|||||||
equalTo(false));
|
equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBootstrapOnlySingleVotingOnlyNode() throws Exception {
|
||||||
|
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||||
|
internalCluster().startNode(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true)
|
||||||
|
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build());
|
||||||
|
internalCluster().startNode();
|
||||||
|
assertBusy(() -> assertThat(client().admin().cluster().prepareState().get().getState().getNodes().getSize(), equalTo(2)));
|
||||||
|
assertThat(
|
||||||
|
VotingOnlyNodePlugin.isVotingOnlyNode(client().admin().cluster().prepareState().get().getState().nodes().getMasterNode()),
|
||||||
|
equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
|
public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
|
||||||
internalCluster().setBootstrapMasterNodeIndex(0);
|
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||||
internalCluster().startNode();
|
internalCluster().startNode();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user