Rollover previously requested index stats for all indices in the provided alias, which causes an exception when there is a closed index with that alias. This commit adjusts the IndicesOptions used on the index stats request so that closed indices are ignored, rather than throwing an exception.
This commit is contained in:
parent
43f588a29e
commit
e47bdf760e
|
@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||||
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
|
@ -49,11 +51,12 @@ import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -108,7 +111,13 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void masterOperation(final RolloverRequest rolloverRequest, final ClusterState state,
|
protected void masterOperation(RolloverRequest request, ClusterState state,
|
||||||
|
ActionListener<RolloverResponse> listener) throws Exception {
|
||||||
|
throw new UnsupportedOperationException("The task parameter is required");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState state,
|
||||||
final ActionListener<RolloverResponse> listener) {
|
final ActionListener<RolloverResponse> listener) {
|
||||||
final MetaData metaData = state.metaData();
|
final MetaData metaData = state.metaData();
|
||||||
validate(metaData, rolloverRequest);
|
validate(metaData, rolloverRequest);
|
||||||
|
@ -124,7 +133,12 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
final String rolloverIndexName = indexNameExpressionResolver.resolveDateMathExpression(unresolvedName);
|
final String rolloverIndexName = indexNameExpressionResolver.resolveDateMathExpression(unresolvedName);
|
||||||
MetaDataCreateIndexService.validateIndexName(rolloverIndexName, state); // will fail if the index already exists
|
MetaDataCreateIndexService.validateIndexName(rolloverIndexName, state); // will fail if the index already exists
|
||||||
checkNoDuplicatedAliasInIndexTemplate(metaData, rolloverIndexName, rolloverRequest.getAlias());
|
checkNoDuplicatedAliasInIndexTemplate(metaData, rolloverIndexName, rolloverRequest.getAlias());
|
||||||
client.admin().indices().prepareStats(rolloverRequest.getAlias()).clear().setDocs(true).execute(
|
IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getAlias())
|
||||||
|
.clear()
|
||||||
|
.indicesOptions(IndicesOptions.fromOptions(true, false, true, true))
|
||||||
|
.docs(true);
|
||||||
|
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||||
|
client.execute(IndicesStatsAction.INSTANCE, statsRequest,
|
||||||
new ActionListener<IndicesStatsResponse>() {
|
new ActionListener<IndicesStatsResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(IndicesStatsResponse statsResponse) {
|
public void onResponse(IndicesStatsResponse statsResponse) {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -381,4 +382,54 @@ public class RolloverIT extends ESIntegTestCase {
|
||||||
assertThat(error.getMessage(), equalTo(
|
assertThat(error.getMessage(), equalTo(
|
||||||
"Rollover alias [logs-write] can point to multiple indices, found duplicated alias [[logs-write]] in index template [logs]"));
|
"Rollover alias [logs-write] can point to multiple indices, found duplicated alias [[logs-write]] in index template [logs]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRolloverWithClosedIndexInAlias() throws Exception {
|
||||||
|
final String aliasName = "alias";
|
||||||
|
final String openNonwriteIndex = "open-index-nonwrite";
|
||||||
|
final String closedIndex = "closed-index-nonwrite";
|
||||||
|
final String writeIndexPrefix = "write-index-";
|
||||||
|
assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get());
|
||||||
|
assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get());
|
||||||
|
assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
|
||||||
|
|
||||||
|
index(closedIndex, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
index(aliasName, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
index(aliasName, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
refresh(aliasName);
|
||||||
|
|
||||||
|
assertAcked(client().admin().indices().prepareClose(closedIndex).get());
|
||||||
|
|
||||||
|
RolloverResponse rolloverResponse = client().admin().indices().prepareRolloverIndex(aliasName)
|
||||||
|
.addMaxIndexDocsCondition(1)
|
||||||
|
.get();
|
||||||
|
assertTrue(rolloverResponse.isRolledOver());
|
||||||
|
assertEquals(writeIndexPrefix + "000001", rolloverResponse.getOldIndex());
|
||||||
|
assertEquals(writeIndexPrefix + "000002", rolloverResponse.getNewIndex());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRolloverWithClosedWriteIndex() throws Exception {
|
||||||
|
final String aliasName = "alias";
|
||||||
|
final String openNonwriteIndex = "open-index-nonwrite";
|
||||||
|
final String closedIndex = "closed-index-nonwrite";
|
||||||
|
final String writeIndexPrefix = "write-index-";
|
||||||
|
assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get());
|
||||||
|
assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get());
|
||||||
|
assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
|
||||||
|
|
||||||
|
index(closedIndex, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
index(aliasName, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
index(aliasName, SINGLE_MAPPING_NAME, null, "{\"foo\": \"bar\"}");
|
||||||
|
refresh(aliasName);
|
||||||
|
|
||||||
|
assertAcked(client().admin().indices().prepareClose(closedIndex).get());
|
||||||
|
assertAcked(client().admin().indices().prepareClose(writeIndexPrefix + "000001").get());
|
||||||
|
ensureGreen(aliasName);
|
||||||
|
|
||||||
|
RolloverResponse rolloverResponse = client().admin().indices().prepareRolloverIndex(aliasName)
|
||||||
|
.addMaxIndexDocsCondition(1)
|
||||||
|
.get();
|
||||||
|
assertTrue(rolloverResponse.isRolledOver());
|
||||||
|
assertEquals(writeIndexPrefix + "000001", rolloverResponse.getOldIndex());
|
||||||
|
assertEquals(writeIndexPrefix + "000002", rolloverResponse.getNewIndex());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,20 +21,19 @@ package org.elasticsearch.action.admin.indices.rollover;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.client.AdminClient;
|
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.IndicesAdminClient;
|
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.AliasAction;
|
import org.elasticsearch.cluster.metadata.AliasAction;
|
||||||
|
@ -45,6 +44,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
|
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
|
@ -71,6 +71,7 @@ import org.elasticsearch.index.shard.ShardPath;
|
||||||
import org.elasticsearch.index.store.StoreStats;
|
import org.elasticsearch.index.store.StoreStats;
|
||||||
import org.elasticsearch.index.warmer.WarmerStats;
|
import org.elasticsearch.index.warmer.WarmerStats;
|
||||||
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -92,6 +93,7 @@ import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -366,9 +368,12 @@ public class TransportRolloverActionTests extends ESTestCase {
|
||||||
assertThat(ex.getMessage(), containsString("index template [test-template]"));
|
assertThat(ex.getMessage(), containsString("index template [test-template]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPrimariesFromWriteIndex() {
|
public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPrimariesFromWriteIndex() throws Exception {
|
||||||
final TransportService mockTransportService = mock(TransportService.class);
|
final TransportService mockTransportService = mock(TransportService.class);
|
||||||
final ClusterService mockClusterService = mock(ClusterService.class);
|
final ClusterService mockClusterService = mock(ClusterService.class);
|
||||||
|
final DiscoveryNode mockNode = mock(DiscoveryNode.class);
|
||||||
|
when(mockNode.getId()).thenReturn("mocknode");
|
||||||
|
when(mockClusterService.localNode()).thenReturn(mockNode);
|
||||||
final ThreadPool mockThreadPool = mock(ThreadPool.class);
|
final ThreadPool mockThreadPool = mock(ThreadPool.class);
|
||||||
final MetaDataCreateIndexService mockCreateIndexService = mock(MetaDataCreateIndexService.class);
|
final MetaDataCreateIndexService mockCreateIndexService = mock(MetaDataCreateIndexService.class);
|
||||||
final IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
final IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
||||||
|
@ -377,31 +382,23 @@ public class TransportRolloverActionTests extends ESTestCase {
|
||||||
final MetaDataIndexAliasesService mdIndexAliasesService = mock(MetaDataIndexAliasesService.class);
|
final MetaDataIndexAliasesService mdIndexAliasesService = mock(MetaDataIndexAliasesService.class);
|
||||||
|
|
||||||
final Client mockClient = mock(Client.class);
|
final Client mockClient = mock(Client.class);
|
||||||
final AdminClient mockAdminClient = mock(AdminClient.class);
|
|
||||||
final IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class);
|
|
||||||
when(mockClient.admin()).thenReturn(mockAdminClient);
|
|
||||||
when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient);
|
|
||||||
|
|
||||||
final IndicesStatsRequestBuilder mockIndicesStatsBuilder = mock(IndicesStatsRequestBuilder.class);
|
|
||||||
when(mockIndicesAdminClient.prepareStats(any())).thenReturn(mockIndicesStatsBuilder);
|
|
||||||
final Map<String, IndexStats> indexStats = new HashMap<>();
|
final Map<String, IndexStats> indexStats = new HashMap<>();
|
||||||
int total = randomIntBetween(500, 1000);
|
int total = randomIntBetween(500, 1000);
|
||||||
indexStats.put("logs-index-000001", createIndexStats(200L, total));
|
indexStats.put("logs-index-000001", createIndexStats(200L, total));
|
||||||
indexStats.put("logs-index-000002", createIndexStats(300L, total));
|
indexStats.put("logs-index-000002", createIndexStats(300L, total));
|
||||||
final IndicesStatsResponse statsResponse = createAliasToMultipleIndicesStatsResponse(indexStats);
|
final IndicesStatsResponse statsResponse = createAliasToMultipleIndicesStatsResponse(indexStats);
|
||||||
when(mockIndicesStatsBuilder.clear()).thenReturn(mockIndicesStatsBuilder);
|
|
||||||
when(mockIndicesStatsBuilder.setDocs(true)).thenReturn(mockIndicesStatsBuilder);
|
|
||||||
|
|
||||||
assert statsResponse.getPrimaries().getDocs().getCount() == 500L;
|
|
||||||
assert statsResponse.getTotal().getDocs().getCount() == (total + total);
|
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
Object[] args = invocation.getArguments();
|
Object[] args = invocation.getArguments();
|
||||||
assert args.length == 1;
|
assert args.length == 3;
|
||||||
ActionListener<IndicesStatsResponse> listener = (ActionListener<IndicesStatsResponse>) args[0];
|
ActionListener<IndicesStatsResponse> listener = (ActionListener<IndicesStatsResponse>) args[2];
|
||||||
listener.onResponse(statsResponse);
|
listener.onResponse(statsResponse);
|
||||||
return null;
|
return null;
|
||||||
}).when(mockIndicesStatsBuilder).execute(any(ActionListener.class));
|
}).when(mockClient).execute(eq(IndicesStatsAction.INSTANCE), any(ActionRequest.class), any(ActionListener.class));
|
||||||
|
|
||||||
|
assert statsResponse.getPrimaries().getDocs().getCount() == 500L;
|
||||||
|
assert statsResponse.getTotal().getDocs().getCount() == (total + total);
|
||||||
|
|
||||||
final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("logs-index-000001")
|
final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("logs-index-000001")
|
||||||
.putAlias(AliasMetaData.builder("logs-alias").writeIndex(false).build()).settings(settings(Version.CURRENT))
|
.putAlias(AliasMetaData.builder("logs-alias").writeIndex(false).build()).settings(settings(Version.CURRENT))
|
||||||
|
@ -422,7 +419,7 @@ public class TransportRolloverActionTests extends ESTestCase {
|
||||||
RolloverRequest rolloverRequest = new RolloverRequest("logs-alias", "logs-index-000003");
|
RolloverRequest rolloverRequest = new RolloverRequest("logs-alias", "logs-index-000003");
|
||||||
rolloverRequest.addMaxIndexDocsCondition(500L);
|
rolloverRequest.addMaxIndexDocsCondition(500L);
|
||||||
rolloverRequest.dryRun(true);
|
rolloverRequest.dryRun(true);
|
||||||
transportRolloverAction.masterOperation(rolloverRequest, stateBefore, future);
|
transportRolloverAction.masterOperation(mock(Task.class), rolloverRequest, stateBefore, future);
|
||||||
|
|
||||||
RolloverResponse response = future.actionGet();
|
RolloverResponse response = future.actionGet();
|
||||||
assertThat(response.getOldIndex(), equalTo("logs-index-000002"));
|
assertThat(response.getOldIndex(), equalTo("logs-index-000002"));
|
||||||
|
@ -438,7 +435,7 @@ public class TransportRolloverActionTests extends ESTestCase {
|
||||||
rolloverRequest = new RolloverRequest("logs-alias", "logs-index-000003");
|
rolloverRequest = new RolloverRequest("logs-alias", "logs-index-000003");
|
||||||
rolloverRequest.addMaxIndexDocsCondition(300L);
|
rolloverRequest.addMaxIndexDocsCondition(300L);
|
||||||
rolloverRequest.dryRun(true);
|
rolloverRequest.dryRun(true);
|
||||||
transportRolloverAction.masterOperation(rolloverRequest, stateBefore, future);
|
transportRolloverAction.masterOperation(mock(Task.class), rolloverRequest, stateBefore, future);
|
||||||
|
|
||||||
response = future.actionGet();
|
response = future.actionGet();
|
||||||
assertThat(response.getOldIndex(), equalTo("logs-index-000002"));
|
assertThat(response.getOldIndex(), equalTo("logs-index-000002"));
|
||||||
|
|
Loading…
Reference in New Issue