diff --git a/.gitignore b/.gitignore index b4ec8795057..5d7dbbefdc8 100644 --- a/.gitignore +++ b/.gitignore @@ -38,11 +38,14 @@ dependency-reduced-pom.xml # osx stuff .DS_Store +# default folders in which the create_bwc_index.py expects to find old es versions in +/backwards +/dev-tools/backwards + # needed in case docs build is run...maybe we can configure doc build to generate files under build? html_docs # random old stuff that we should look at the necessity of... /tmp/ -backwards/ eclipse-build diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 74cae08298b..4c6771ccda7 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -268,6 +268,7 @@ class ClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) { Map esConfig = [ 'cluster.name' : node.clusterName, + 'node.name' : "node-" + node.nodeNum, 'pidfile' : node.pidFile, 'path.repo' : "${node.sharedDir}/repo", 'path.shared_data' : "${node.sharedDir}/", diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 7a12ab8ace2..aef99494d92 100644 --- a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -214,7 +215,11 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr type = in.readString(); id = in.readString(); version = in.readZLong(); - seqNo = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } forcedRefresh = in.readBoolean(); result = Result.readFrom(in); } @@ -226,7 +231,9 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr out.writeString(type); out.writeString(id); out.writeZLong(version); - out.writeZLong(seqNo); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } out.writeBoolean(forcedRefresh); result.writeTo(out); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 83eaf11ca3a..ac32b16eb57 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -58,6 +58,6 @@ public class ShardFlushRequest extends ReplicationRequest { @Override public String toString() { - return "flush {" + super.toString() + "}"; + return "flush {" + shardId + "}"; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 877db0579a0..150b7c6a52b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -103,7 +104,9 @@ public class ShardStats implements Streamable, Writeable, ToXContent { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + } } @Override @@ -114,7 +117,9 @@ public class ShardStats implements Streamable, Writeable, ToXContent { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); - out.writeOptionalWriteable(seqNoStats); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeOptionalWriteable(seqNoStats); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cef89e1ce78..86024e4dcd5 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -151,7 +150,7 @@ public class TransportShardBulkAction extends TransportWriteAction implement out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeOptionalString(null); + // Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null. + // On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for + // the transport layer OK as it will be ignored. + out.writeOptionalString("0"); out.writeOptionalWriteable(null); } out.writeBytesReference(source); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 70220679752..9ed9f7f7cd1 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -165,19 +165,22 @@ public class TransportIndexAction extends TransportWriteAction> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + /** * Constructor for deserialization. */ @@ -62,11 +66,32 @@ public abstract class ReplicatedWriteRequest public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } + } + + /** + * Returns the sequence number for this operation. The sequence number is assigned while the operation + * is performed on the primary shard. + */ + public long getSeqNo() { + return seqNo; + } + + /** sets the sequence number for this operation. should only be called on the primary shard */ + public void setSeqNo(long seqNo) { + this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 47284789850..25dcc29a5c3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -283,7 +283,7 @@ public class ReplicationOperation< } private void decPendingAndFinishIfNeeded() { - assert pendingActions.get() > 0; + assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index d520b3d4e70..091f96c408f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,7 +55,6 @@ public abstract class ReplicationRequest(request, primary.allocationId().getId())); @@ -950,6 +951,8 @@ public abstract class TransportReplicationAction< public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request, indexShard); if (result.replicaRequest() != null) { + assert result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); } return result; @@ -983,16 +986,25 @@ public abstract class TransportReplicationAction< @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - localCheckpoint = in.readZLong(); - allocationId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.readFrom(in); + localCheckpoint = in.readZLong(); + allocationId = in.readString(); + } else { + // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. + } } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeZLong(localCheckpoint); - out.writeString(allocationId); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.writeTo(out); + out.writeZLong(localCheckpoint); + out.writeString(allocationId); + } else { + // we use to write empty responses + Empty.INSTANCE.writeTo(out); + } } @Override @@ -1016,10 +1028,9 @@ public abstract class TransportReplicationAction< listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); return; } - transportService.sendRequest(node, transportReplicaAction, - new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions, - // Eclipse can't handle when this is <> so we specify the type here. - new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + final ConcreteShardRequest concreteShardRequest = + new ConcreteShardRequest<>(request, replica.allocationId().getId()); + sendReplicaRequest(concreteShardRequest, node, listener); } @Override @@ -1060,6 +1071,14 @@ public abstract class TransportReplicationAction< } } + /** sends the given replica request to the supplied nodes */ + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, + // Eclipse can't handle when this is <> so we specify the type here. + new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + } + /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static final class ConcreteShardRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 3ea61385f1c..0f9db99326d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -204,7 +204,7 @@ public class MappingMetaData extends AbstractDiffable { // timestamp out.writeBoolean(false); // enabled out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format()); - out.writeOptionalString(null); + out.writeOptionalString("now"); // 5.x default out.writeOptionalBoolean(null); } out.writeBoolean(hasParentField()); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 8e877298313..6e13573794d 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -20,20 +20,21 @@ package org.elasticsearch.index.seqno; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -41,8 +42,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; -import java.io.UnsupportedEncodingException; public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -65,6 +64,17 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.sendReplicaRequest(concreteShardRequest, node, listener); + } else { + listener.onResponse( + new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + @Override protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request, IndexShard indexShard) throws Exception { long checkpoint = indexShard.getGlobalCheckpoint(); @@ -105,6 +115,11 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -134,6 +149,14 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings, true) + " }"))); + } + + private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { + updateIndexSetting(name, settings.build()); + } + private void updateIndexSetting(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings, true)))); + } + + protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + id + "\"}"))); + } + return numDocs; + } + + public void testSeqNoCheckpoints() throws Exception { + Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered: {}", nodes.toString()); + final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", bwcNames); + + final boolean checkGlobalCheckpoints = nodes.getMaster().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED); + logger.info("master version is [{}], global checkpoints will be [{}]", nodes.getMaster().getVersion(), + checkGlobalCheckpoints ? "checked" : "not be checked"); + if (checkGlobalCheckpoints) { + settings.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "100ms"); + } + final String index = "test"; + createIndex(index, settings.build()); + try (RestClient newNodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + int numDocs = indexDocs(index, 0, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + + logger.info("allowing shards on all nodes"); + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + logger.info("indexing some more docs"); + numDocs += indexDocs(index, numDocs, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + logger.info("moving primary to new node"); + Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); + ensureGreen(); + logger.info("indexing some more docs"); + int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5)); + numDocs += numDocsOnNewPrimary; + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + } + } + + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + assertBusy(() -> { + try { + List shards = buildShards(nodes, client); + Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); + assertNotNull("failed to find primary shard", primaryShard); + final long expectedGlobalCkp; + final long expectMaxSeqNo; + logger.info("primary resolved to node {}", primaryShard.getNode()); + if (primaryShard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + expectMaxSeqNo = numDocs - 1; + expectedGlobalCkp = numDocs - 1; + } else { + expectedGlobalCkp = SequenceNumbersService.UNASSIGNED_SEQ_NO; + expectMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + } + for (Shard shard : shards) { + if (shard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + final SeqNoStats seqNoStats = shard.getSeqNoStats(); + logger.info("stats for {}, primary [{}]: [{}]", shard.getNode(), shard.isPrimary(), seqNoStats); + assertThat("max_seq no on " + shard.getNode() + " is wrong", seqNoStats.getMaxSeqNo(), equalTo(expectMaxSeqNo)); + assertThat("localCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getLocalCheckpoint(), equalTo(expectMaxSeqNo)); + if (checkGlobalCheckpoints) { + assertThat("globalCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getGlobalCheckpoint(), equalTo(expectedGlobalCkp)); + } + } else { + logger.info("skipping seq no test on {}", shard.getNode()); + } + } + } catch (IOException e) { + throw new AssertionError("unexpected io exception", e); + } + }); + } + + private List buildShards(Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); + List shardStats = objectPath(response).evaluate("indices.test.shards.0"); + ArrayList shards = new ArrayList<>(); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.getSafe(nodeId); + final SeqNoStats seqNoStats; + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); + Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); + Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); + seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + } else { + seqNoStats = null; + } + shards.add(new Shard(node, primary, seqNoStats)); + } + return shards; + } + + private Nodes buildNodeAndVersions() throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = objectPath(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + Nodes nodes = new Nodes(); + for (String id : nodesAsMap.keySet()) { + nodes.add(new Node( + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version")), + HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); + } + response = client().performRequest("GET", "_cluster/state"); + nodes.setMasterNodeId(objectPath(response).evaluate("master_node")); + return nodes; + } + + final class Nodes extends HashMap { + + private String masterNodeId = null; + + public Node getMaster() { + return get(masterNodeId); + } + + public void setMasterNodeId(String id) { + if (get(id) == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found. got:" + toString()); + } + masterNodeId = id; + } + + public void add(Node node) { + put(node.getId(), node); + } + + public List getNewNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().after(bwcVersion)).collect(Collectors.toList()); + } + + public List getBWCNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().equals(bwcVersion)).collect(Collectors.toList()); + } + + public Version getBWCVersion() { + if (isEmpty()) { + throw new IllegalStateException("no nodes available"); + } + return Version.fromId(values().stream().map(node -> node.getVersion().id).min(Integer::compareTo).get()); + } + + public Node getSafe(String id) { + Node node = get(id); + if (node == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found"); + } + return node; + } + + @Override + public String toString() { + return "Nodes{" + + "masterNodeId='" + masterNodeId + "'\n" + + values().stream().map(Node::toString).collect(Collectors.joining("\n")) + + '}'; + } + } + + final class Node { + private final String id; + private final String nodeName; + private final Version version; + private final HttpHost publishAddress; + + Node(String id, String nodeName, Version version, HttpHost publishAddress) { + this.id = id; + this.nodeName = nodeName; + this.version = version; + this.publishAddress = publishAddress; + } + + public String getId() { + return id; + } + + public String getNodeName() { + return nodeName; + } + + public HttpHost getPublishAddress() { + return publishAddress; + } + + public Version getVersion() { + return version; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", nodeName='" + nodeName + '\'' + + ", version=" + version + + '}'; + } + } + + final class Shard { + private final Node node; + private final boolean Primary; + private final SeqNoStats seqNoStats; + + Shard(Node node, boolean primary, SeqNoStats seqNoStats) { + this.node = node; + Primary = primary; + this.seqNoStats = seqNoStats; + } + + public Node getNode() { + return node; + } + + public boolean isPrimary() { + return Primary; + } + + public SeqNoStats getSeqNoStats() { + return seqNoStats; + } + + @Override + public String toString() { + return "Shard{" + + "node=" + node + + ", Primary=" + Primary + + ", seqNoStats=" + seqNoStats + + '}'; + } + } +} diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index e17e2454108..182e6a9f7d9 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -25,7 +25,7 @@ task oldClusterTest(type: RestIntegTestTask) { mustRunAfter(precommit) cluster { distribution = 'zip' - bwcVersion = '6.0.0-alpha1-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop + bwcVersion = '5.2.0-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop numBwcNodes = 2 numNodes = 2 clusterName = 'rolling-upgrade' diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml index 8c7cd83b0e0..4a37734d284 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml @@ -1,5 +1,9 @@ --- "Help": + - skip: + version: " - 5.99.99" + reason: seq no stats were added in 6.0.0 + - do: cat.shards: help: true diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 0fc8cb4506b..975e6e2f866 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -111,8 +111,8 @@ public abstract class ESRestTestCase extends ESTestCase { } clusterHosts = unmodifiableList(hosts); logger.info("initializing REST clients against {}", clusterHosts); - client = buildClient(restClientSettings()); - adminClient = buildClient(restAdminSettings()); + client = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + adminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); } assert client != null; assert adminClient != null; @@ -272,8 +272,8 @@ public abstract class ESRestTestCase extends ESTestCase { return "http"; } - private RestClient buildClient(Settings settings) throws IOException { - RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); String keystorePath = settings.get(TRUSTSTORE_PATH); if (keystorePath != null) { final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java index 6311944fdcb..265fd7b3e85 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java @@ -46,17 +46,28 @@ public class ObjectPath { this.object = object; } + + /** + * A utility method that creates an {@link ObjectPath} via {@link #ObjectPath(Object)} returns + * the result of calling {@link #evaluate(String)} on it. + */ + public static T evaluate(Object object, String path) throws IOException { + return new ObjectPath(object).evaluate(path, Stash.EMPTY); + } + + /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path) throws IOException { + public T evaluate(String path) throws IOException { return evaluate(path, Stash.EMPTY); } /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path, Stash stash) throws IOException { + @SuppressWarnings("unchecked") + public T evaluate(String path, Stash stash) throws IOException { String[] parts = parsePath(path); Object object = this.object; for (String part : parts) { @@ -65,7 +76,7 @@ public class ObjectPath { return null; } } - return object; + return (T)object; } @SuppressWarnings("unchecked")