Merge remote-tracking branch 'origin/master' into feature/client_aggs_parsing
This commit is contained in:
commit
5fb04fa603
|
@ -554,7 +554,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
||||
|
|
|
@ -21,11 +21,13 @@ package org.elasticsearch.search.aggregations.bucket.significant;
|
|||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -74,7 +76,12 @@ public abstract class InternalMappedSignificantTerms<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<B> getBucketsInternal() {
|
||||
public Iterator<SignificantTerms.Bucket> iterator() {
|
||||
return buckets.stream().map(bucket -> (SignificantTerms.Bucket) bucket).collect(Collectors.toList()).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<B> getBuckets() {
|
||||
return buckets;
|
||||
}
|
||||
|
||||
|
@ -117,4 +124,19 @@ public abstract class InternalMappedSignificantTerms<
|
|||
protected int doHashCode() {
|
||||
return Objects.hash(super.doHashCode(), format, subsetSize, supersetSize, significanceHeuristic, buckets, bucketMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(CommonFields.DOC_COUNT.getPreferredName(), subsetSize);
|
||||
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
||||
for (Bucket bucket : buckets) {
|
||||
//There is a condition (presumably when only one shard has a bucket?) where reduce is not called
|
||||
// and I end up with buckets that contravene the user's min_doc_count criteria in my reducer
|
||||
if (bucket.subsetDf >= minDocCount) {
|
||||
bucket.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.significant;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
@ -33,20 +34,22 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
/**
|
||||
* Result of the significant terms aggregation.
|
||||
*/
|
||||
public abstract class InternalSignificantTerms<A extends InternalSignificantTerms<A, B>, B extends InternalSignificantTerms.Bucket<B>>
|
||||
extends InternalMultiBucketAggregation<A, B> implements SignificantTerms, ToXContent {
|
||||
|
||||
private static final String SCORE = "score";
|
||||
private static final String BG_COUNT = "bg_count";
|
||||
|
||||
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
|
||||
public abstract static class Bucket<B extends Bucket<B>> extends SignificantTerms.Bucket {
|
||||
public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBucketAggregation.InternalBucket
|
||||
implements SignificantTerms.Bucket {
|
||||
/**
|
||||
* Reads a bucket. Should be a constructor reference.
|
||||
*/
|
||||
|
@ -55,14 +58,21 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
B read(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException;
|
||||
}
|
||||
|
||||
long subsetDf;
|
||||
long subsetSize;
|
||||
long supersetDf;
|
||||
long supersetSize;
|
||||
long bucketOrd;
|
||||
protected InternalAggregations aggregations;
|
||||
double score;
|
||||
protected InternalAggregations aggregations;
|
||||
final transient DocValueFormat format;
|
||||
|
||||
protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
|
||||
InternalAggregations aggregations, DocValueFormat format) {
|
||||
super(subsetDf, subsetSize, supersetDf, supersetSize);
|
||||
this.subsetSize = subsetSize;
|
||||
this.supersetSize = supersetSize;
|
||||
this.subsetDf = subsetDf;
|
||||
this.supersetDf = supersetDf;
|
||||
this.aggregations = aggregations;
|
||||
this.format = format;
|
||||
}
|
||||
|
@ -71,7 +81,8 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
* Read from a stream.
|
||||
*/
|
||||
protected Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) {
|
||||
super(in, subsetSize, supersetSize);
|
||||
this.subsetSize = subsetSize;
|
||||
this.supersetSize = supersetSize;
|
||||
this.format = format;
|
||||
}
|
||||
|
||||
|
@ -95,7 +106,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
return subsetSize;
|
||||
}
|
||||
|
||||
public void updateScore(SignificanceHeuristic significanceHeuristic) {
|
||||
void updateScore(SignificanceHeuristic significanceHeuristic) {
|
||||
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
|
||||
}
|
||||
|
||||
|
@ -149,6 +160,20 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
public int hashCode() {
|
||||
return Objects.hash(getClass(), bucketOrd, aggregations, score, format);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
keyToXContent(builder);
|
||||
builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount());
|
||||
builder.field(SCORE, score);
|
||||
builder.field(BG_COUNT, supersetDf);
|
||||
aggregations.toXContentInternal(builder, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
protected abstract XContentBuilder keyToXContent(XContentBuilder builder) throws IOException;
|
||||
}
|
||||
|
||||
protected final int requiredSize;
|
||||
|
@ -179,16 +204,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
protected abstract void writeTermTypeInfoTo(StreamOutput out) throws IOException;
|
||||
|
||||
@Override
|
||||
public Iterator<SignificantTerms.Bucket> iterator() {
|
||||
return getBuckets().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SignificantTerms.Bucket> getBuckets() {
|
||||
return unmodifiableList(getBucketsInternal());
|
||||
}
|
||||
|
||||
protected abstract List<B> getBucketsInternal();
|
||||
public abstract List<B> getBuckets();
|
||||
|
||||
@Override
|
||||
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||
|
@ -206,7 +222,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
|
|||
for (InternalAggregation aggregation : aggregations) {
|
||||
@SuppressWarnings("unchecked")
|
||||
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
|
||||
for (B bucket : terms.getBucketsInternal()) {
|
||||
for (B bucket : terms.getBuckets()) {
|
||||
List<B> existingBuckets = buckets.get(bucket.getKeyAsString());
|
||||
if (existingBuckets == null) {
|
||||
existingBuckets = new ArrayList<>(aggregations.size());
|
||||
|
|
|
@ -77,7 +77,7 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
|
|||
}
|
||||
|
||||
@Override
|
||||
int compareTerm(SignificantTerms.Bucket other) {
|
||||
public int compareTerm(SignificantTerms.Bucket other) {
|
||||
return Long.compare(term, ((Number) other.getKey()).longValue());
|
||||
}
|
||||
|
||||
|
@ -97,17 +97,11 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
|
|||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
protected XContentBuilder keyToXContent(XContentBuilder builder) throws IOException {
|
||||
builder.field(CommonFields.KEY.getPreferredName(), term);
|
||||
if (format != DocValueFormat.RAW) {
|
||||
builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), format.format(term));
|
||||
}
|
||||
builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount());
|
||||
builder.field("score", score);
|
||||
builder.field("bg_count", supersetDf);
|
||||
aggregations.toXContentInternal(builder, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -159,17 +153,6 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
|
|||
supersetSize, significanceHeuristic, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("doc_count", subsetSize);
|
||||
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
||||
for (Bucket bucket : buckets) {
|
||||
bucket.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Bucket[] createBucketsArray(int size) {
|
||||
return new Bucket[size];
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.bucket.significant.heuristics.Signi
|
|||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -82,7 +83,7 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
|
|||
}
|
||||
|
||||
@Override
|
||||
int compareTerm(SignificantTerms.Bucket other) {
|
||||
public int compareTerm(SignificantTerms.Bucket other) {
|
||||
return termBytes.compareTo(((Bucket) other).termBytes);
|
||||
}
|
||||
|
||||
|
@ -102,15 +103,8 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
|
|||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString());
|
||||
builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount());
|
||||
builder.field("score", score);
|
||||
builder.field("bg_count", supersetDf);
|
||||
aggregations.toXContentInternal(builder, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
protected XContentBuilder keyToXContent(XContentBuilder builder) throws IOException {
|
||||
return builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -161,21 +155,6 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
|
|||
supersetSize, significanceHeuristic, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("doc_count", subsetSize);
|
||||
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
||||
for (Bucket bucket : buckets) {
|
||||
//There is a condition (presumably when only one shard has a bucket?) where reduce is not called
|
||||
// and I end up with buckets that contravene the user's min_doc_count criteria in my reducer
|
||||
if (bucket.subsetDf >= minDocCount) {
|
||||
bucket.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Bucket[] createBucketsArray(int size) {
|
||||
return new Bucket[size];
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.significant;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -28,54 +26,26 @@ import java.util.List;
|
|||
* An aggregation that collects significant terms in comparison to a background set.
|
||||
*/
|
||||
public interface SignificantTerms extends MultiBucketsAggregation, Iterable<SignificantTerms.Bucket> {
|
||||
abstract class Bucket extends InternalMultiBucketAggregation.InternalBucket {
|
||||
|
||||
long subsetDf;
|
||||
long subsetSize;
|
||||
long supersetDf;
|
||||
long supersetSize;
|
||||
interface Bucket extends MultiBucketsAggregation.Bucket {
|
||||
|
||||
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
|
||||
this.subsetSize = subsetSize;
|
||||
this.supersetSize = supersetSize;
|
||||
this.subsetDf = subsetDf;
|
||||
this.supersetDf = supersetDf;
|
||||
}
|
||||
double getSignificanceScore();
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
protected Bucket(StreamInput in, long subsetSize, long supersetSize) {
|
||||
this.subsetSize = subsetSize;
|
||||
this.supersetSize = supersetSize;
|
||||
}
|
||||
Number getKeyAsNumber();
|
||||
|
||||
abstract int compareTerm(SignificantTerms.Bucket other);
|
||||
long getSubsetDf();
|
||||
|
||||
public abstract double getSignificanceScore();
|
||||
long getSupersetDf();
|
||||
|
||||
abstract Number getKeyAsNumber();
|
||||
long getSupersetSize();
|
||||
|
||||
public long getSubsetDf() {
|
||||
return subsetDf;
|
||||
}
|
||||
|
||||
public long getSupersetDf() {
|
||||
return supersetDf;
|
||||
}
|
||||
|
||||
public long getSupersetSize() {
|
||||
return supersetSize;
|
||||
}
|
||||
|
||||
public long getSubsetSize() {
|
||||
return subsetSize;
|
||||
}
|
||||
long getSubsetSize();
|
||||
|
||||
int compareTerm(SignificantTerms.Bucket other);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<Bucket> getBuckets();
|
||||
List<? extends Bucket> getBuckets();
|
||||
|
||||
/**
|
||||
* Get the bucket for the given term, or null if there is no such bucket.
|
||||
|
|
|
@ -31,15 +31,18 @@ import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
|
|||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyIterator;
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
/**
|
||||
* Result of the running the significant terms aggregation on an unmapped field.
|
||||
*/
|
||||
public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedSignificantTerms, UnmappedSignificantTerms.Bucket> {
|
||||
|
||||
public static final String NAME = "umsigterms";
|
||||
|
||||
/**
|
||||
|
@ -117,7 +120,12 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Bucket> getBucketsInternal() {
|
||||
public Iterator<SignificantTerms.Bucket> iterator() {
|
||||
return emptyIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Bucket> getBuckets() {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,300 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.DisruptedLinks;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
|
||||
|
||||
static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
|
||||
|
||||
private ClusterDiscoveryConfiguration discoveryConfig;
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void clearConfig() {
|
||||
discoveryConfig = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfShards() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfReplicas() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
private boolean disableBeforeIndexDeletion;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
disableBeforeIndexDeletion = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
|
||||
if (scheme instanceof NetworkDisruption &&
|
||||
((NetworkDisruption) scheme).getNetworkLinkDisruptionType() instanceof NetworkDisruption.NetworkUnresponsive) {
|
||||
// the network unresponsive disruption may leave operations in flight
|
||||
// this is because this disruption scheme swallows requests by design
|
||||
// as such, these operations will never be marked as finished
|
||||
disableBeforeIndexDeletion = true;
|
||||
}
|
||||
super.setDisruptionScheme(scheme);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeIndexDeletion() throws Exception {
|
||||
if (disableBeforeIndexDeletion == false) {
|
||||
super.beforeIndexDeletion();
|
||||
}
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
|
||||
return startCluster(numberOfNodes, -1);
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
|
||||
return startCluster(numberOfNodes, minimumMasterNode, null);
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
|
||||
ExecutionException, InterruptedException {
|
||||
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
List<String> nodes = internalCluster().startNodes(numberOfNodes);
|
||||
ensureStableCluster(numberOfNodes);
|
||||
|
||||
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
static final Settings DEFAULT_SETTINGS = Settings.builder()
|
||||
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
|
||||
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
|
||||
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
|
||||
.put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
|
||||
// value and the time of disruption and does not recover immediately
|
||||
// when disruption is stop. We should make sure we recover faster
|
||||
// then the default of 30s, causing ensureGreen and friends to time out
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
void configureCluster(
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
int minimumMasterNode
|
||||
) throws ExecutionException, InterruptedException {
|
||||
configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
}
|
||||
|
||||
void configureCluster(
|
||||
Settings settings,
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
int minimumMasterNode
|
||||
) throws ExecutionException, InterruptedException {
|
||||
if (minimumMasterNode < 0) {
|
||||
minimumMasterNode = numberOfNodes / 2 + 1;
|
||||
}
|
||||
logger.info("---> configured unicast");
|
||||
// TODO: Rarely use default settings form some of these
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(settings)
|
||||
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
|
||||
.build();
|
||||
|
||||
if (discoveryConfig == null) {
|
||||
if (unicastHostsOrdinals == null) {
|
||||
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
|
||||
} else {
|
||||
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterState getNodeClusterState(String node) {
|
||||
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node) throws Exception {
|
||||
assertNoMaster(node, null, TimeValue.timeValueSeconds(10));
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exception {
|
||||
assertNoMaster(node, null, maxWaitTime);
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
final DiscoveryNodes nodes = state.nodes();
|
||||
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
|
||||
if (expectedBlocks != null) {
|
||||
for (ClusterBlockLevel level : expectedBlocks.levels()) {
|
||||
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
|
||||
(level));
|
||||
}
|
||||
}
|
||||
}
|
||||
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
String masterNode = null;
|
||||
if (state.nodes().getMasterNode() != null) {
|
||||
masterNode = state.nodes().getMasterNode().getName();
|
||||
}
|
||||
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
|
||||
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
|
||||
oldMasterNode, not(equalTo(masterNode)));
|
||||
}
|
||||
}, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
void assertMaster(String masterNode, List<String> nodes) throws Exception {
|
||||
assertBusy(() -> {
|
||||
for (String node : nodes) {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
String failMsgSuffix = "cluster_state:\n" + state;
|
||||
assertThat("wrong node count on [" + node + "]. " + failMsgSuffix, state.nodes().getSize(), equalTo(nodes.size()));
|
||||
String otherMasterNodeName = state.nodes().getMasterNode() != null ? state.nodes().getMasterNode().getName() : null;
|
||||
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, otherMasterNodeName, equalTo(masterNode));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ServiceDisruptionScheme addRandomDisruptionScheme() {
|
||||
// TODO: add partial partitions
|
||||
NetworkDisruption p;
|
||||
final DisruptedLinks disruptedLinks;
|
||||
if (randomBoolean()) {
|
||||
disruptedLinks = TwoPartitions.random(random(), internalCluster().getNodeNames());
|
||||
} else {
|
||||
disruptedLinks = Bridge.random(random(), internalCluster().getNodeNames());
|
||||
}
|
||||
final NetworkLinkDisruptionType disruptionType;
|
||||
switch (randomInt(2)) {
|
||||
case 0:
|
||||
disruptionType = new NetworkDisruption.NetworkUnresponsive();
|
||||
break;
|
||||
case 1:
|
||||
disruptionType = new NetworkDisconnect();
|
||||
break;
|
||||
case 2:
|
||||
disruptionType = NetworkDisruption.NetworkDelay.random(random());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
final ServiceDisruptionScheme scheme;
|
||||
if (rarely()) {
|
||||
scheme = new SlowClusterStateProcessing(random());
|
||||
} else {
|
||||
scheme = new NetworkDisruption(disruptedLinks, disruptionType);
|
||||
}
|
||||
setDisruptionScheme(scheme);
|
||||
return scheme;
|
||||
}
|
||||
|
||||
NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
|
||||
final NetworkLinkDisruptionType disruptionType;
|
||||
if (randomBoolean()) {
|
||||
disruptionType = new NetworkDisruption.NetworkUnresponsive();
|
||||
} else {
|
||||
disruptionType = new NetworkDisconnect();
|
||||
}
|
||||
NetworkDisruption partition = new NetworkDisruption(partitions, disruptionType);
|
||||
|
||||
setDisruptionScheme(partition);
|
||||
|
||||
return partition;
|
||||
}
|
||||
|
||||
TwoPartitions isolateNode(String isolatedNode) {
|
||||
Set<String> side1 = new HashSet<>();
|
||||
Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
|
||||
side1.add(isolatedNode);
|
||||
side2.remove(isolatedNode);
|
||||
|
||||
return new TwoPartitions(side1, side2);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,452 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* Tests various cluster operations (e.g., indexing) during disruptions.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
/**
|
||||
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
|
||||
* We also collect & report the type of indexing failures that occur.
|
||||
* <p>
|
||||
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE," +
|
||||
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
|
||||
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
|
||||
public void testAckedIndexing() throws Exception {
|
||||
|
||||
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
|
||||
final String timeout = seconds + "s";
|
||||
|
||||
final List<String> nodes = startCluster(rarely() ? 5 : 3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
||||
));
|
||||
ensureGreen();
|
||||
|
||||
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
|
||||
logger.info("disruption scheme [{}] added", disruptionScheme);
|
||||
|
||||
final ConcurrentHashMap<String, String> ackedDocs = new ConcurrentHashMap<>(); // id -> node sent.
|
||||
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
List<Thread> indexers = new ArrayList<>(nodes.size());
|
||||
List<Semaphore> semaphores = new ArrayList<>(nodes.size());
|
||||
final AtomicInteger idGenerator = new AtomicInteger(0);
|
||||
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
|
||||
final List<Exception> exceptedExceptions = Collections.synchronizedList(new ArrayList<Exception>());
|
||||
|
||||
logger.info("starting indexers");
|
||||
try {
|
||||
for (final String node : nodes) {
|
||||
final Semaphore semaphore = new Semaphore(0);
|
||||
semaphores.add(semaphore);
|
||||
final Client client = client(node);
|
||||
final String name = "indexer_" + indexers.size();
|
||||
final int numPrimaries = getNumShards("test").numPrimaries;
|
||||
Thread thread = new Thread(() -> {
|
||||
while (!stop.get()) {
|
||||
String id = null;
|
||||
try {
|
||||
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
|
||||
continue;
|
||||
}
|
||||
logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
|
||||
try {
|
||||
id = Integer.toString(idGenerator.incrementAndGet());
|
||||
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
|
||||
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
|
||||
IndexResponse response =
|
||||
client.prepareIndex("test", "type", id)
|
||||
.setSource("{}", XContentType.JSON)
|
||||
.setTimeout(timeout)
|
||||
.get(timeout);
|
||||
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
|
||||
ackedDocs.put(id, node);
|
||||
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
|
||||
} catch (ElasticsearchException e) {
|
||||
exceptedExceptions.add(e);
|
||||
final String docId = id;
|
||||
logger.trace(
|
||||
(Supplier<?>)
|
||||
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
|
||||
} finally {
|
||||
countDownLatchRef.get().countDown();
|
||||
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// fine - semaphore interrupt
|
||||
} catch (AssertionError | Exception e) {
|
||||
logger.info(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("unexpected exception in background thread of [{}]", node),
|
||||
e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
thread.setName(name);
|
||||
thread.start();
|
||||
indexers.add(thread);
|
||||
}
|
||||
|
||||
int docsPerIndexer = randomInt(3);
|
||||
logger.info("indexing {} docs per indexer before partition", docsPerIndexer);
|
||||
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
|
||||
for (Semaphore semaphore : semaphores) {
|
||||
semaphore.release(docsPerIndexer);
|
||||
}
|
||||
assertTrue(countDownLatchRef.get().await(1, TimeUnit.MINUTES));
|
||||
|
||||
for (int iter = 1 + randomInt(2); iter > 0; iter--) {
|
||||
logger.info("starting disruptions & indexing (iteration [{}])", iter);
|
||||
disruptionScheme.startDisrupting();
|
||||
|
||||
docsPerIndexer = 1 + randomInt(5);
|
||||
logger.info("indexing {} docs per indexer during partition", docsPerIndexer);
|
||||
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
|
||||
Collections.shuffle(semaphores, random());
|
||||
for (Semaphore semaphore : semaphores) {
|
||||
assertThat(semaphore.availablePermits(), equalTo(0));
|
||||
semaphore.release(docsPerIndexer);
|
||||
}
|
||||
logger.info("waiting for indexing requests to complete");
|
||||
assertTrue(countDownLatchRef.get().await(docsPerIndexer * seconds * 1000 + 2000, TimeUnit.MILLISECONDS));
|
||||
|
||||
logger.info("stopping disruption");
|
||||
disruptionScheme.stopDisrupting();
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
|
||||
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
|
||||
}
|
||||
// in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the master
|
||||
// is the super-connected node and recovery source and target are on opposite sides of the bridge
|
||||
if (disruptionScheme instanceof NetworkDisruption &&
|
||||
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
|
||||
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
|
||||
}
|
||||
ensureGreen("test");
|
||||
|
||||
logger.info("validating successful docs");
|
||||
assertBusy(() -> {
|
||||
for (String node : nodes) {
|
||||
try {
|
||||
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
|
||||
for (String id : ackedDocs.keySet()) {
|
||||
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
|
||||
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
|
||||
}
|
||||
} catch (AssertionError | NoShardAvailableActionException e) {
|
||||
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
logger.info("done validating (iteration [{}])", iter);
|
||||
}
|
||||
} finally {
|
||||
if (exceptedExceptions.size() > 0) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Exception e : exceptedExceptions) {
|
||||
sb.append("\n").append(e.getMessage());
|
||||
}
|
||||
logger.debug("Indexing exceptions during disruption: {}", sb);
|
||||
}
|
||||
logger.info("shutting down indexers");
|
||||
stop.set(true);
|
||||
for (Thread indexer : indexers) {
|
||||
indexer.interrupt();
|
||||
indexer.join(60000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a document which is indexed on the majority side of a partition, is available from the minority side,
|
||||
* once the partition is healed
|
||||
*/
|
||||
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
|
||||
List<String> nodes = startCluster(3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
)
|
||||
.get());
|
||||
ensureGreen("test");
|
||||
|
||||
nodes = new ArrayList<>(nodes);
|
||||
Collections.shuffle(nodes, random());
|
||||
String isolatedNode = nodes.get(0);
|
||||
String notIsolatedNode = nodes.get(1);
|
||||
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
NetworkDisruption scheme = addRandomDisruptionType(partitions);
|
||||
scheme.startDisrupting();
|
||||
ensureStableCluster(2, notIsolatedNode);
|
||||
assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
|
||||
|
||||
|
||||
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
|
||||
.get();
|
||||
assertThat(indexResponse.getVersion(), equalTo(1L));
|
||||
|
||||
logger.info("Verifying if document exists via node[{}]", notIsolatedNode);
|
||||
GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId())
|
||||
.setPreference("_local")
|
||||
.get();
|
||||
assertThat(getResponse.isExists(), is(true));
|
||||
assertThat(getResponse.getVersion(), equalTo(1L));
|
||||
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
||||
|
||||
scheme.stopDisrupting();
|
||||
|
||||
ensureStableCluster(3);
|
||||
ensureGreen("test");
|
||||
|
||||
for (String node : nodes) {
|
||||
logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node);
|
||||
getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId())
|
||||
.setPreference("_local")
|
||||
.get();
|
||||
assertThat(getResponse.isExists(), is(true));
|
||||
assertThat(getResponse.getVersion(), equalTo(1L));
|
||||
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
// simulate handling of sending shard failure during an isolation
|
||||
public void testSendingShardFailure() throws Exception {
|
||||
List<String> nodes = startCluster(3, 2);
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
|
||||
String nonMasterNode = randomFrom(nonMasterNodes);
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
));
|
||||
ensureGreen();
|
||||
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
|
||||
|
||||
// fail a random shard
|
||||
ShardRouting failedShard =
|
||||
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
|
||||
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicBoolean success = new AtomicBoolean();
|
||||
|
||||
String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
// we cannot use the NetworkUnresponsive disruption type here as it will swallow the "shard failed" request, calling neither
|
||||
// onSuccess nor onFailure on the provided listener.
|
||||
NetworkLinkDisruptionType disruptionType = new NetworkDisconnect();
|
||||
NetworkDisruption networkDisruption = new NetworkDisruption(partitions, disruptionType);
|
||||
setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
|
||||
ShardStateAction.Listener() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
success.set(true);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
success.set(false);
|
||||
latch.countDown();
|
||||
assert false;
|
||||
}
|
||||
});
|
||||
|
||||
if (isolatedNode.equals(nonMasterNode)) {
|
||||
assertNoMaster(nonMasterNode);
|
||||
} else {
|
||||
ensureStableCluster(2, nonMasterNode);
|
||||
}
|
||||
|
||||
// heal the partition
|
||||
networkDisruption.removeAndEnsureHealthy(internalCluster());
|
||||
|
||||
// the cluster should stabilize
|
||||
ensureStableCluster(3);
|
||||
|
||||
latch.await();
|
||||
|
||||
// the listener should be notified
|
||||
assertTrue(success.get());
|
||||
|
||||
// the failed shard should be gone
|
||||
List<ShardRouting> shards = clusterService().state().getRoutingTable().allShards("test");
|
||||
for (ShardRouting shard : shards) {
|
||||
assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId())));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
|
||||
* node but already deleted on the source node. Search request should still work.
|
||||
*/
|
||||
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
|
||||
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
|
||||
configureCluster(Settings.EMPTY, 3, null, 1);
|
||||
final String masterNode = internalCluster().startMasterOnlyNode();
|
||||
final String node_1 = internalCluster().startDataOnlyNode();
|
||||
|
||||
logger.info("--> creating index [test] with one shard and on replica");
|
||||
assertAcked(prepareCreate("test").setSettings(
|
||||
Settings.builder().put(indexSettings())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
|
||||
);
|
||||
ensureGreen("test");
|
||||
|
||||
final String node_2 = internalCluster().startDataOnlyNode();
|
||||
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc")
|
||||
.setSource("{\"int_field\":1}", XContentType.JSON));
|
||||
}
|
||||
indexRandom(true, indexRequestBuilderList);
|
||||
|
||||
IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
|
||||
// now search for the documents and see if we get a reply
|
||||
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
||||
}
|
||||
|
||||
public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
|
||||
// test for https://github.com/elastic/elasticsearch/issues/8823
|
||||
configureCluster(2, null, 1);
|
||||
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
|
||||
internalCluster().startDataOnlyNode(Settings.EMPTY);
|
||||
|
||||
ensureStableCluster(2);
|
||||
assertAcked(prepareCreate("index").setSettings(Settings.builder().put("index.number_of_replicas", 0)));
|
||||
index("index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
|
||||
ensureGreen();
|
||||
|
||||
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
|
||||
@Override
|
||||
public boolean clearData(String nodeName) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
ensureGreen("index");
|
||||
assertTrue(client().prepareGet("index", "doc", "1").get().isExists());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that indices are properly deleted even if there is a master transition in between.
|
||||
* Test for https://github.com/elastic/elasticsearch/issues/11665
|
||||
*/
|
||||
public void testIndicesDeleted() throws Exception {
|
||||
final Settings settings = Settings.builder()
|
||||
.put(DEFAULT_SETTINGS)
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
|
||||
.build();
|
||||
final String idxName = "test";
|
||||
configureCluster(settings, 3, null, 2);
|
||||
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
ensureStableCluster(3);
|
||||
assertAcked(prepareCreate("test"));
|
||||
|
||||
final String masterNode1 = internalCluster().getMasterName();
|
||||
NetworkDisruption networkDisruption =
|
||||
new NetworkDisruption(new TwoPartitions(masterNode1, dataNode), new NetworkDisruption.NetworkUnresponsive());
|
||||
internalCluster().setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
// We know this will time out due to the partition, we check manually below to not proceed until
|
||||
// the delete has been applied to the master node and the master eligible node.
|
||||
internalCluster().client(masterNode1).admin().indices().prepareDelete(idxName).setTimeout("0s").get();
|
||||
// Don't restart the master node until we know the index deletion has taken effect on master and the master eligible node.
|
||||
assertBusy(() -> {
|
||||
for (String masterNode : allMasterEligibleNodes) {
|
||||
final ClusterState masterState = internalCluster().clusterService(masterNode).state();
|
||||
assertTrue("index not deleted on " + masterNode, masterState.metaData().hasIndex(idxName) == false);
|
||||
}
|
||||
});
|
||||
internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK);
|
||||
ensureYellow();
|
||||
assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.MembershipAction;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
/**
|
||||
* Tests for discovery during disruptions.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
public void testIsolatedUnicastNodes() throws Exception {
|
||||
List<String> nodes = startCluster(4, -1, new int[]{0});
|
||||
// Figure out what is the elected master node
|
||||
final String unicastTarget = nodes.get(0);
|
||||
|
||||
Set<String> unicastTargetSide = new HashSet<>();
|
||||
unicastTargetSide.add(unicastTarget);
|
||||
|
||||
Set<String> restOfClusterSide = new HashSet<>();
|
||||
restOfClusterSide.addAll(nodes);
|
||||
restOfClusterSide.remove(unicastTarget);
|
||||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unicast target node and the rest of the cluster
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide),
|
||||
new NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(3, nodes.get(1));
|
||||
|
||||
// The isolate master node must report no master, so it starts with pinging
|
||||
assertNoMaster(unicastTarget);
|
||||
networkDisconnect.stopDisrupting();
|
||||
// Wait until the master node sees all 3 nodes again.
|
||||
ensureStableCluster(4);
|
||||
}
|
||||
|
||||
/**
|
||||
* A 4 node cluster with m_m_n set to 3 and each node has one unicast endpoint. One node partitions from the master node.
|
||||
* The temporal unicast responses is empty. When partition is solved the one ping response contains a master node.
|
||||
* The rejoining node should take this master node and connect.
|
||||
*/
|
||||
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
|
||||
List<String> nodes = startCluster(4, -1, new int[]{0});
|
||||
// Figure out what is the elected master node
|
||||
final String masterNode = internalCluster().getMasterName();
|
||||
logger.info("---> legit elected master node={}", masterNode);
|
||||
List<String> otherNodes = new ArrayList<>(nodes);
|
||||
otherNodes.remove(masterNode);
|
||||
otherNodes.remove(nodes.get(0)); // <-- Don't isolate the node that is in the unicast endpoint for all the other nodes.
|
||||
final String isolatedNode = otherNodes.get(0);
|
||||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unlucky node and elected master node in both directions.
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode),
|
||||
new NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(3, masterNode);
|
||||
|
||||
// The isolate master node must report no master, so it starts with pinging
|
||||
assertNoMaster(isolatedNode);
|
||||
networkDisconnect.stopDisrupting();
|
||||
// Wait until the master node sees all 4 nodes again.
|
||||
ensureStableCluster(4);
|
||||
// The elected master shouldn't have changed, since the isolated node never could have elected himself as
|
||||
// master since m_m_n of 3 could never be satisfied.
|
||||
assertMaster(masterNode, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test cluster join with issues in cluster state publishing *
|
||||
*/
|
||||
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
|
||||
List<String> nodes = startCluster(2, 1);
|
||||
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
String nonMasterNode;
|
||||
if (masterNode.equals(nodes.get(0))) {
|
||||
nonMasterNode = nodes.get(1);
|
||||
} else {
|
||||
nonMasterNode = nodes.get(0);
|
||||
}
|
||||
|
||||
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
|
||||
|
||||
TransportService masterTranspotService =
|
||||
internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
|
||||
|
||||
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
|
||||
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
||||
nonMasterNode);
|
||||
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
|
||||
|
||||
assertNoMaster(nonMasterNode);
|
||||
|
||||
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
|
||||
MockTransportService masterTransportService =
|
||||
(MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
|
||||
TransportService localTransportService =
|
||||
internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName());
|
||||
if (randomBoolean()) {
|
||||
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
|
||||
} else {
|
||||
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
|
||||
}
|
||||
|
||||
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
|
||||
.original()) {
|
||||
@Override
|
||||
protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
super.sendRequest(connection, requestId, action, request, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
return super.openConnection(node, profile);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
countDownLatch.await();
|
||||
|
||||
logger.info("waiting for cluster to reform");
|
||||
masterTransportService.clearRule(localTransportService);
|
||||
nonMasterTransportService.clearRule(localTransportService);
|
||||
|
||||
ensureStableCluster(2);
|
||||
|
||||
// shutting down the nodes, to avoid the leakage check tripping
|
||||
// on the states associated with the commit requests we may have dropped
|
||||
internalCluster().stopRandomNonMasterNode();
|
||||
}
|
||||
|
||||
public void testClusterFormingWithASlowNode() throws Exception {
|
||||
configureCluster(3, null, 2);
|
||||
|
||||
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
|
||||
|
||||
// don't wait for initial state, we want to add the disruption while the cluster is forming
|
||||
internalCluster().startNodes(3, Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s").build());
|
||||
|
||||
logger.info("applying disruption while cluster is forming ...");
|
||||
|
||||
internalCluster().setDisruptionScheme(disruption);
|
||||
disruption.startDisrupting();
|
||||
|
||||
ensureStableCluster(3);
|
||||
}
|
||||
|
||||
public void testElectMasterWithLatestVersion() throws Exception {
|
||||
configureCluster(3, null, 2);
|
||||
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
|
||||
ensureStableCluster(3);
|
||||
ServiceDisruptionScheme isolateAllNodes =
|
||||
new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node : nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
internalCluster().clearDisruptionScheme();
|
||||
ensureStableCluster(3);
|
||||
final String preferredMasterName = internalCluster().getMasterName();
|
||||
final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode();
|
||||
assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId()));
|
||||
}
|
||||
|
||||
logger.info("--> preferred master is {}", preferredMaster);
|
||||
final Set<String> nonPreferredNodes = new HashSet<>(nodes);
|
||||
nonPreferredNodes.remove(preferredMasterName);
|
||||
final ServiceDisruptionScheme isolatePreferredMaster =
|
||||
new NetworkDisruption(
|
||||
new NetworkDisruption.TwoPartitions(
|
||||
Collections.singleton(preferredMasterName), nonPreferredNodes),
|
||||
new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolatePreferredMaster);
|
||||
isolatePreferredMaster.startDisrupting();
|
||||
|
||||
assertAcked(client(randomFrom(nonPreferredNodes)).admin().indices().prepareCreate("test").setSettings(
|
||||
INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1,
|
||||
INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0
|
||||
));
|
||||
|
||||
internalCluster().clearDisruptionScheme(false);
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election again");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node : nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
|
||||
isolateAllNodes.stopDisrupting();
|
||||
|
||||
final ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
if (state.metaData().hasIndex("test") == false) {
|
||||
fail("index 'test' was lost. current cluster state: " + state);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an asymmetric break between a master and one of the nodes and makes
|
||||
* sure that the node is removed form the cluster, that the node start pinging and that
|
||||
* the cluster reforms when healed.
|
||||
*/
|
||||
public void testNodeNotReachableFromMaster() throws Exception {
|
||||
startCluster(3);
|
||||
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
String nonMasterNode = null;
|
||||
while (nonMasterNode == null) {
|
||||
nonMasterNode = randomFrom(internalCluster().getNodeNames());
|
||||
if (nonMasterNode.equals(masterNode)) {
|
||||
nonMasterNode = null;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
|
||||
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
||||
masterNode);
|
||||
if (randomBoolean()) {
|
||||
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
|
||||
} else {
|
||||
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
|
||||
}
|
||||
|
||||
logger.info("waiting for [{}] to be removed from cluster", nonMasterNode);
|
||||
ensureStableCluster(2, masterNode);
|
||||
|
||||
logger.info("waiting for [{}] to have no master", nonMasterNode);
|
||||
assertNoMaster(nonMasterNode);
|
||||
|
||||
logger.info("healing partition and checking cluster reforms");
|
||||
masterTransportService.clearAllRules();
|
||||
|
||||
ensureStableCluster(3);
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,466 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.LongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
* Tests relating to the loss of the master.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class MasterDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
/**
|
||||
* Test that no split brain occurs under partial network partition. See https://github.com/elastic/elasticsearch/issues/2488
|
||||
*/
|
||||
public void testFailWithMinimumMasterNodesConfigured() throws Exception {
|
||||
List<String> nodes = startCluster(3);
|
||||
|
||||
// Figure out what is the elected master node
|
||||
final String masterNode = internalCluster().getMasterName();
|
||||
logger.info("---> legit elected master node={}", masterNode);
|
||||
|
||||
// Pick a node that isn't the elected master.
|
||||
Set<String> nonMasters = new HashSet<>(nodes);
|
||||
nonMasters.remove(masterNode);
|
||||
final String unluckyNode = randomFrom(nonMasters.toArray(Strings.EMPTY_ARRAY));
|
||||
|
||||
|
||||
// Simulate a network issue between the unlucky node and elected master node in both directions.
|
||||
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(
|
||||
new NetworkDisruption.TwoPartitions(masterNode, unluckyNode),
|
||||
new NetworkDisruption.NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(2, masterNode);
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
assertNoMaster(unluckyNode);
|
||||
|
||||
networkDisconnect.stopDisrupting();
|
||||
|
||||
// Wait until the master node sees all 3 nodes again.
|
||||
ensureStableCluster(3);
|
||||
|
||||
// The elected master shouldn't have changed, since the unlucky node never could have elected himself as
|
||||
// master since m_m_n of 2 could never be satisfied.
|
||||
assertMaster(masterNode, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that nodes fault detection works after master (re) election
|
||||
*/
|
||||
public void testNodesFDAfterMasterReelection() throws Exception {
|
||||
startCluster(4);
|
||||
|
||||
logger.info("--> stopping current master");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
ensureStableCluster(3);
|
||||
|
||||
logger.info("--> reducing min master nodes to 2");
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
|
||||
.get());
|
||||
|
||||
String master = internalCluster().getMasterName();
|
||||
String nonMaster = null;
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
if (!node.equals(master)) {
|
||||
nonMaster = node;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("--> isolating [{}]", nonMaster);
|
||||
NetworkDisruption.TwoPartitions partitions = isolateNode(nonMaster);
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
logger.info("--> waiting for master to remove it");
|
||||
ensureStableCluster(2, master);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that emulates a frozen elected master node that unfreezes and pushes his cluster state to other nodes
|
||||
* that already are following another elected master node. These nodes should reject this cluster state and prevent
|
||||
* them from following the stale master.
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
|
||||
public void testStaleMasterNotHijackingMajority() throws Exception {
|
||||
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
|
||||
final List<String> nodes = startCluster(3, 2);
|
||||
|
||||
// Save the current master node as old master node, because that node will get frozen
|
||||
final String oldMasterNode = internalCluster().getMasterName();
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, node);
|
||||
}
|
||||
assertMaster(oldMasterNode, nodes);
|
||||
|
||||
// Simulating a painful gc by suspending all threads for a long time on the current elected master node.
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(random(), oldMasterNode);
|
||||
|
||||
// Save the majority side
|
||||
final List<String> majoritySide = new ArrayList<>(nodes);
|
||||
majoritySide.remove(oldMasterNode);
|
||||
|
||||
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
||||
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
|
||||
String>>>());
|
||||
for (final String node : majoritySide) {
|
||||
masters.put(node, new ArrayList<Tuple<String, String>>());
|
||||
internalCluster().getInstance(ClusterService.class, node).addListener(event -> {
|
||||
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
|
||||
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
||||
if (!Objects.equals(previousMaster, currentMaster)) {
|
||||
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
|
||||
event.previousState());
|
||||
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
|
||||
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
|
||||
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final CountDownLatch oldMasterNodeSteppedDown = new CountDownLatch(1);
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).addListener(event -> {
|
||||
if (event.state().nodes().getMasterNodeId() == null) {
|
||||
oldMasterNodeSteppedDown.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
logger.info("freezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
// Wait for the majority side to get stable
|
||||
assertDifferentMaster(majoritySide.get(0), oldMasterNode);
|
||||
assertDifferentMaster(majoritySide.get(1), oldMasterNode);
|
||||
|
||||
// the test is periodically tripping on the following assertion. To find out which threads are blocking the nodes from making
|
||||
// progress we print a stack dump
|
||||
boolean failed = true;
|
||||
try {
|
||||
assertDiscoveryCompleted(majoritySide);
|
||||
failed = false;
|
||||
} finally {
|
||||
if (failed) {
|
||||
logger.error("discovery failed to complete, probably caused by a blocked thread: {}",
|
||||
new HotThreads().busiestThreads(Integer.MAX_VALUE).ignoreIdleThreads(false).detect());
|
||||
}
|
||||
}
|
||||
|
||||
// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed,
|
||||
// but will be queued and once the old master node un-freezes it gets executed.
|
||||
// The old master node will send this update + the cluster state where he is flagged as master to the other
|
||||
// nodes that follow the new master. These nodes should ignore this update.
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
|
||||
ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return ClusterState.builder(currentState).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
|
||||
}
|
||||
});
|
||||
|
||||
// Save the new elected master node
|
||||
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
||||
logger.info("new detected master node [{}]", newMasterNode);
|
||||
|
||||
// Stop disruption
|
||||
logger.info("Unfreeze node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
||||
oldMasterNodeSteppedDown.await(30, TimeUnit.SECONDS);
|
||||
// Make sure that the end state is consistent on all nodes:
|
||||
assertDiscoveryCompleted(nodes);
|
||||
assertMaster(newMasterNode, nodes);
|
||||
|
||||
assertThat(masters.size(), equalTo(2));
|
||||
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
|
||||
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
|
||||
equalTo(2));
|
||||
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
|
||||
equalTo(oldMasterNode));
|
||||
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
|
||||
.get(0).v2(), nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
|
||||
nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
|
||||
recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
|
||||
*/
|
||||
public void testMasterNodeGCs() throws Exception {
|
||||
List<String> nodes = startCluster(3, -1);
|
||||
|
||||
String oldMasterNode = internalCluster().getMasterName();
|
||||
// a very long GC, but it's OK as we remove the disruption when it has had an effect
|
||||
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), oldMasterNode, 100, 200, 30000, 60000);
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
Set<String> oldNonMasterNodesSet = new HashSet<>(nodes);
|
||||
oldNonMasterNodesSet.remove(oldMasterNode);
|
||||
|
||||
List<String> oldNonMasterNodes = new ArrayList<>(oldNonMasterNodesSet);
|
||||
|
||||
logger.info("waiting for nodes to de-elect master [{}]", oldMasterNode);
|
||||
for (String node : oldNonMasterNodesSet) {
|
||||
assertDifferentMaster(node, oldMasterNode);
|
||||
}
|
||||
|
||||
logger.info("waiting for nodes to elect a new master");
|
||||
ensureStableCluster(2, oldNonMasterNodes.get(0));
|
||||
|
||||
logger.info("waiting for any pinging to stop");
|
||||
assertDiscoveryCompleted(oldNonMasterNodes);
|
||||
|
||||
// restore GC
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
final TimeValue waitTime = new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis());
|
||||
ensureStableCluster(3, waitTime, false, oldNonMasterNodes.get(0));
|
||||
|
||||
// make sure all nodes agree on master
|
||||
String newMaster = internalCluster().getMasterName();
|
||||
assertThat(newMaster, not(equalTo(oldMasterNode)));
|
||||
assertMaster(newMaster, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test isolates the master from rest of the cluster, waits for a new master to be elected, restores the partition
|
||||
* and verifies that all node agree on the new cluster state
|
||||
*/
|
||||
@TestLogging(
|
||||
"_root:DEBUG,"
|
||||
+ "org.elasticsearch.cluster.service:TRACE,"
|
||||
+ "org.elasticsearch.gateway:TRACE,"
|
||||
+ "org.elasticsearch.indices.store:TRACE")
|
||||
public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception {
|
||||
final List<String> nodes = startCluster(3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
||||
));
|
||||
|
||||
ensureGreen();
|
||||
String isolatedNode = internalCluster().getMasterName();
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
|
||||
|
||||
// make sure cluster reforms
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
// make sure isolated need picks up on things.
|
||||
assertNoMaster(isolatedNode, TimeValue.timeValueSeconds(40));
|
||||
|
||||
// restore isolation
|
||||
networkDisruption.stopDisrupting();
|
||||
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
|
||||
true, node);
|
||||
}
|
||||
|
||||
logger.info("issue a reroute");
|
||||
// trigger a reroute now, instead of waiting for the background reroute of RerouteService
|
||||
assertAcked(client().admin().cluster().prepareReroute());
|
||||
// and wait for it to finish and for the cluster to stabilize
|
||||
ensureGreen("test");
|
||||
|
||||
// verify all cluster states are the same
|
||||
// use assert busy to wait for cluster states to be applied (as publish_timeout has low value)
|
||||
assertBusy(() -> {
|
||||
ClusterState state = null;
|
||||
for (String node : nodes) {
|
||||
ClusterState nodeState = getNodeClusterState(node);
|
||||
if (state == null) {
|
||||
state = nodeState;
|
||||
continue;
|
||||
}
|
||||
// assert nodes are identical
|
||||
try {
|
||||
assertEquals("unequal versions", state.version(), nodeState.version());
|
||||
assertEquals("unequal node count", state.nodes().getSize(), nodeState.nodes().getSize());
|
||||
assertEquals("different masters ", state.nodes().getMasterNodeId(), nodeState.nodes().getMasterNodeId());
|
||||
assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version());
|
||||
assertEquals("different routing", state.routingTable().toString(), nodeState.routingTable().toString());
|
||||
} catch (AssertionError t) {
|
||||
fail("failed comparing cluster state: " + t.getMessage() + "\n" +
|
||||
"--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
|
||||
"\n--- cluster state [" + node + "]: ---\n" + nodeState);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the proper block is applied when nodes loose their master
|
||||
*/
|
||||
public void testVerifyApiBlocksDuringPartition() throws Exception {
|
||||
startCluster(3);
|
||||
|
||||
// Makes sure that the get request can be executed on each node locally:
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
));
|
||||
|
||||
// Everything is stable now, it is now time to simulate evil...
|
||||
// but first make sure we have no initializing shards and all is green
|
||||
// (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
|
||||
ensureGreen("test");
|
||||
|
||||
TwoPartitions partitions = TwoPartitions.random(random(), internalCluster().getNodeNames());
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
|
||||
assertEquals(1, partitions.getMinoritySide().size());
|
||||
final String isolatedNode = partitions.getMinoritySide().iterator().next();
|
||||
assertEquals(2, partitions.getMajoritySide().size());
|
||||
final String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
|
||||
|
||||
// Simulate a network issue between the unlucky node and the rest of the cluster.
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
|
||||
assertNoMaster(isolatedNode, DiscoverySettings.NO_MASTER_BLOCK_WRITES, TimeValue.timeValueSeconds(10));
|
||||
|
||||
|
||||
logger.info("wait until elected master has been removed and a new 2 node cluster was from (via [{}])", isolatedNode);
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
for (String node : partitions.getMajoritySide()) {
|
||||
ClusterState nodeState = getNodeClusterState(node);
|
||||
boolean success = true;
|
||||
if (nodeState.nodes().getMasterNode() == null) {
|
||||
success = false;
|
||||
}
|
||||
if (!nodeState.blocks().global().isEmpty()) {
|
||||
success = false;
|
||||
}
|
||||
if (!success) {
|
||||
fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n"
|
||||
+ nodeState);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
networkDisruption.stopDisrupting();
|
||||
|
||||
// Wait until the master node sees al 3 nodes again.
|
||||
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()));
|
||||
|
||||
logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all");
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
|
||||
.get();
|
||||
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
|
||||
assertNoMaster(isolatedNode, DiscoverySettings.NO_MASTER_BLOCK_ALL, TimeValue.timeValueSeconds(10));
|
||||
|
||||
// make sure we have stable cluster & cross partition recoveries are canceled by the removal of the missing node
|
||||
// the unresponsive partition causes recoveries to only time out after 15m (default) and these will cause
|
||||
// the test to fail due to unfreed resources
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
}
|
||||
|
||||
void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
|
||||
for (final String node : nodes) {
|
||||
assertTrue(
|
||||
"node [" + node + "] is still joining master",
|
||||
awaitBusy(
|
||||
() -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -443,8 +443,8 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
|
|||
Aggregations aggregations = classBuckets.next().getAggregations();
|
||||
SignificantTerms sigTerms = aggregations.get("mySignificantTerms");
|
||||
|
||||
Collection<SignificantTerms.Bucket> classA = sigTerms.getBuckets();
|
||||
Iterator<SignificantTerms.Bucket> classBBucketIterator = sigTerms.getBuckets().iterator();
|
||||
List<? extends SignificantTerms.Bucket> classA = sigTerms.getBuckets();
|
||||
Iterator<SignificantTerms.Bucket> classBBucketIterator = sigTerms.iterator();
|
||||
assertThat(classA.size(), greaterThan(0));
|
||||
for (SignificantTerms.Bucket classABucket : classA) {
|
||||
SignificantTerms.Bucket classBBucket = classBBucketIterator.next();
|
||||
|
|
|
@ -410,6 +410,7 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
public TimeValue expectedTimeToHeal() {
|
||||
return TimeValue.timeValueMillis(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -501,4 +502,5 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
return "network delays for [" + delay + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.test.disruption;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -44,8 +43,8 @@ public class NetworkDisruptionIT extends ESIntegTestCase {
|
|||
public void testNetworkPartitionWithNodeShutdown() throws IOException {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
String[] nodeNames = internalCluster().getNodeNames();
|
||||
NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]),
|
||||
new NetworkUnresponsive());
|
||||
NetworkDisruption networkDisruption =
|
||||
new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]), new NetworkDisruption.NetworkUnresponsive());
|
||||
internalCluster().setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames[0]));
|
||||
|
|
Loading…
Reference in New Issue