Improve CcrRepositoryIT mappings tests (#38817)

Currently we index documents concurrently to attempt to ensure that we
update mappings during the restore process. However, this does not
actually test that the mapping will be correct and is dangerous as it
can lead to a misalignment between the max sequence number and the local
checkpoint. If these are not aligned, peer recovery cannot be completed
without initiating following which this test does not do. That causes
teardown assertions to fail.

This commit removes the concurrent indexing and flushes after the
documents are indexed. Additionally it modifies the mapping specific
test to ensure that there is a mapping update when the restore session
is initiated. This mapping update is picked up at the end of the restore
by the follower.
This commit is contained in:
Tim Brooks 2019-02-13 07:18:24 -07:00
parent e769cb4efd
commit ec08581319
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
3 changed files with 55 additions and 51 deletions

View File

@ -23,7 +23,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {
public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/clear";
public static final String NAME = "internal:admin/ccr/restore/session/clear";
private ClearCcrRestoreSessionAction() {
super(NAME);

View File

@ -33,7 +33,7 @@ import java.io.IOException;
public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse> {
public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/put";
public static final String NAME = "internal:admin/ccr/restore/session/put";
private PutCcrRestoreSessionAction() {
super(NAME);

View File

@ -40,6 +40,7 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
@ -48,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -59,8 +61,6 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
// TODO: is completed.
public class CcrRepositoryIT extends CcrIntegTestCase {
private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
@ -197,36 +197,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
AtomicBoolean isRunning = new AtomicBoolean(true);
// Concurrently index new docs with mapping changes
Thread thread = new Thread(() -> {
char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
for (char c : chars) {
if (isRunning.get() == false) {
break;
}
final String source;
long l = randomLongBetween(0, 50000);
if (randomBoolean()) {
source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l);
} else {
source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l);
}
for (int i = 64; i < 150; i++) {
if (isRunning.get() == false) {
break;
}
leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get();
}
}
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
}
});
thread.start();
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
@ -245,9 +215,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
assertExpectedDocument(followerIndex, i);
}
isRunning.set(false);
thread.join();
settingsRequest = new ClusterUpdateSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
@ -421,23 +388,60 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
.indexSettings(settingsBuilder);
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();
List<MockTransportService> transportServices = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean updateSent = new AtomicBoolean(false);
Runnable updateMappings = () -> {
if (updateSent.compareAndSet(false, true)) {
leaderClient()
.admin()
.indices()
.preparePutMapping(leaderIndex)
.setType("doc")
.setSource("{\"properties\":{\"k\":{\"type\":\"long\"}}}", XContentType.JSON)
.execute(ActionListener.wrap(latch::countDown));
}
try {
latch.await();
} catch (InterruptedException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
};
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
MockTransportService mockTransportService = (MockTransportService) transportService;
transportServices.add(mockTransportService);
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
updateMappings.run();
connection.sendRequest(requestId, action, request, options);
} else {
connection.sendRequest(requestId, action, request, options);
}
});
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(followerIndex);
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
try {
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(followerIndex);
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
} finally {
for (MockTransportService transportService : transportServices) {
transportService.clearAllRules();
}
}
}
private void assertExpectedDocument(String followerIndex, final int value) {