diff --git a/qa/repository-multi-version/build.gradle b/qa/repository-multi-version/build.gradle
new file mode 100644
index 00000000000..ed062e59eee
--- /dev/null
+++ b/qa/repository-multi-version/build.gradle
@@ -0,0 +1,122 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.elasticsearch.gradle.Version
+import org.elasticsearch.gradle.info.BuildParams
+import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
+
+apply plugin: 'elasticsearch.testclusters'
+apply plugin: 'elasticsearch.standalone-test'
+
+tasks.register("bwcTest") {
+ description = 'Runs backwards compatibility tests.'
+ group = 'verification'
+}
+
+dependencies {
+ testCompile project(':client:rest-high-level')
+}
+
+for (Version bwcVersion : bwcVersions.indexCompatible) {
+ String baseName = "v${bwcVersion}"
+ String oldClusterName = "${baseName}-old"
+ String newClusterName = "${baseName}-new"
+
+ def clusterSettings = { v ->
+ return {
+ version = v
+ numberOfNodes = 2
+ setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
+ javaHome = BuildParams.runtimeJavaHome
+ }
+ }
+
+ testClusters {
+ "${oldClusterName}" clusterSettings(bwcVersion.toString())
+ "${newClusterName}" clusterSettings(project.version)
+ }
+
+ tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) {
+ useCluster testClusters."${oldClusterName}"
+ mustRunAfter(precommit)
+ doFirst {
+ project.delete("${buildDir}/cluster/shared/repo/${baseName}")
+ }
+ systemProperty 'tests.rest.suite', 'step1'
+ }
+
+ tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) {
+ useCluster testClusters."${newClusterName}"
+ dependsOn "${baseName}#Step1OldClusterTest"
+ systemProperty 'tests.rest.suite', 'step2'
+ }
+
+ tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) {
+ useCluster testClusters."${oldClusterName}"
+ dependsOn "${baseName}#Step2NewClusterTest"
+ systemProperty 'tests.rest.suite', 'step3'
+ }
+
+ tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) {
+ useCluster testClusters."${newClusterName}"
+ dependsOn "${baseName}#Step3OldClusterTest"
+ systemProperty 'tests.rest.suite', 'step4'
+ }
+
+ tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
+ it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT")
+ it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
+ def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}"
+ it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}")
+ it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}")
+ }
+
+ if (project.bwc_tests_enabled) {
+ bwcTest.dependsOn(
+ tasks.register("${baseName}#bwcTest") {
+ dependsOn tasks.named("${baseName}#Step4NewClusterTest")
+ }
+ )
+ }
+}
+
+task bwcTestSnapshots {
+ if (project.bwc_tests_enabled) {
+ for (final def version : bwcVersions.unreleasedIndexCompatible) {
+ dependsOn "v${version}#bwcTest"
+ }
+ }
+}
+
+check.dependsOn(bwcTestSnapshots)
+
+configurations {
+ testArtifacts.extendsFrom testRuntime
+}
+
+task testJar(type: Jar) {
+ appendix 'test'
+ from sourceSets.test.output
+}
+
+artifacts {
+ testArtifacts testJar
+}
+
+test.enabled = false
diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java
new file mode 100644
index 00000000000..ec2a70c6f84
--- /dev/null
+++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.upgrades;
+
+import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.snapshots.RestoreInfo;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple
+ * clusters of different versions. Concretely this test suite is simulating the following scenario:
+ *
+ * - Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}
+ * - Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}
+ * - Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}
+ * - Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}
+ *
+ * TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository
+ * is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6.
+ */
+public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
+
+ private enum TestStep {
+ STEP1_OLD_CLUSTER("step1"),
+ STEP2_NEW_CLUSTER("step2"),
+ STEP3_OLD_CLUSTER("step3"),
+ STEP4_NEW_CLUSTER("step4");
+
+ private final String name;
+
+ TestStep(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ public static TestStep parse(String value) {
+ switch (value) {
+ case "step1":
+ return STEP1_OLD_CLUSTER;
+ case "step2":
+ return STEP2_NEW_CLUSTER;
+ case "step3":
+ return STEP3_OLD_CLUSTER;
+ case "step4":
+ return STEP4_NEW_CLUSTER;
+ default:
+ throw new AssertionError("unknown test step: " + value);
+ }
+ }
+ }
+
+ protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite"));
+
+ @Override
+ protected boolean preserveSnapshotsUponCompletion() {
+ return true;
+ }
+
+ @Override
+ protected boolean preserveReposUponCompletion() {
+ return true;
+ }
+
+ public void testCreateAndRestoreSnapshot() throws IOException {
+ final String repoName = getTestName();
+ try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
+ final int shards = 3;
+ createIndex(client, "test-index", shards);
+ createRepository(client, repoName, false);
+ createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
+ final String snapshotToDeleteName = "snapshot-to-delete";
+ // Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run
+ // as part of the snapshot delete
+ createSnapshot(client, repoName, snapshotToDeleteName);
+ final List