[CCR] Also copy routing_num_shards from leader to follow index. (#30894)
Bug was introduced when create and follow api was added in #30602
This commit is contained in:
parent
16402305bb
commit
a82f2e31b4
|
@ -45,7 +45,6 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -247,20 +246,21 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||||
IndexMetaData currentIndex = currentState.metaData().index(request.getFollowRequest().getFollowIndex());
|
String followIndex = request.getFollowRequest().getFollowIndex();
|
||||||
|
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
|
||||||
if (currentIndex != null) {
|
if (currentIndex != null) {
|
||||||
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
|
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||||
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(request.getFollowRequest().getFollowIndex());
|
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);
|
||||||
|
|
||||||
// Copy all settings, but overwrite a few settings.
|
// Copy all settings, but overwrite a few settings.
|
||||||
Settings.Builder settingsBuilder = Settings.builder();
|
Settings.Builder settingsBuilder = Settings.builder();
|
||||||
settingsBuilder.put(leaderIndexMetaData.getSettings());
|
settingsBuilder.put(leaderIndexMetaData.getSettings());
|
||||||
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
|
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
|
||||||
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
|
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
|
||||||
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowIndex());
|
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
|
||||||
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
|
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
|
||||||
imdBuilder.settings(settingsBuilder);
|
imdBuilder.settings(settingsBuilder);
|
||||||
|
|
||||||
|
@ -268,7 +268,9 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
|
||||||
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
|
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
|
||||||
imdBuilder.putMapping(cursor.value);
|
imdBuilder.putMapping(cursor.value);
|
||||||
}
|
}
|
||||||
mdBuilder.put(imdBuilder.build(), false);
|
imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards());
|
||||||
|
IndexMetaData followIMD = imdBuilder.build();
|
||||||
|
mdBuilder.put(followIMD, false);
|
||||||
|
|
||||||
ClusterState.Builder builder = ClusterState.builder(currentState);
|
ClusterState.Builder builder = ClusterState.builder(currentState);
|
||||||
builder.metaData(mdBuilder.build());
|
builder.metaData(mdBuilder.build());
|
||||||
|
@ -279,7 +281,10 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
|
||||||
updatedState = allocationService.reroute(
|
updatedState = allocationService.reroute(
|
||||||
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
|
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
|
||||||
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");
|
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");
|
||||||
|
|
||||||
|
logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
|
||||||
|
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());
|
||||||
|
|
||||||
return updatedState;
|
return updatedState;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -145,7 +145,6 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
assertThat(operation.id(), equalTo("5"));
|
assertThat(operation.id(), equalTo("5"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30227")
|
|
||||||
public void testFollowIndex() throws Exception {
|
public void testFollowIndex() throws Exception {
|
||||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
|
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
|
||||||
|
@ -162,6 +161,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||||
|
|
||||||
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
||||||
|
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
|
||||||
for (int i = 0; i < firstBatchNumDocs; i++) {
|
for (int i = 0; i < firstBatchNumDocs; i++) {
|
||||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||||
|
@ -185,6 +185,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
unfollowIndex("index2");
|
unfollowIndex("index2");
|
||||||
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
|
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
|
||||||
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
||||||
|
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
|
||||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||||
|
@ -408,7 +409,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
|
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
|
||||||
return () -> {
|
return () -> {
|
||||||
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
|
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
|
||||||
assertTrue("doc with id [" + value + "] does not exist", getResponse.isExists());
|
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
|
||||||
assertTrue((getResponse.getSource().containsKey("f")));
|
assertTrue((getResponse.getSource().containsKey("f")));
|
||||||
assertThat(getResponse.getSource().get("f"), equalTo(value));
|
assertThat(getResponse.getSource().get("f"), equalTo(value));
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue