diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeAction.java index 297783e331b..e8c1fd7b5ac 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeAction.java @@ -6,40 +6,75 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.StreamSupport; /** - * A {@link LifecycleAction} which force-merges the index's shards to a specific number of segments. + * A {@link LifecycleAction} which force-merges the index. */ public class ForceMergeAction implements LifecycleAction { public static final String NAME = "forcemerge"; + public static final ParseField MAX_NUM_SEGMENTS_FIELD = new ParseField("max_num_segments"); private static final Logger logger = ESLoggerFactory.getLogger(ForceMergeAction.class); - private static final ObjectParser PARSER = new ObjectParser<>(NAME, ForceMergeAction::new); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + false, a -> { + int maxNumSegments = (Integer) a[0]; + return new ForceMergeAction(maxNumSegments); + }); + + static { + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_NUM_SEGMENTS_FIELD); + } + + private final Integer maxNumSegments; public static ForceMergeAction parse(XContentParser parser) { return PARSER.apply(parser, null); } - public ForceMergeAction() { + public ForceMergeAction(int maxNumSegments) { + if (maxNumSegments <= 0) { + throw new IllegalArgumentException("[" + MAX_NUM_SEGMENTS_FIELD.getPreferredName() + + "] must be a positive integer"); + } + this.maxNumSegments = maxNumSegments; } public ForceMergeAction(StreamInput in) throws IOException { + this.maxNumSegments = in.readVInt(); + } + + public int getMaxNumSegments() { + return maxNumSegments; } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(maxNumSegments); } @Override @@ -50,19 +85,111 @@ public class ForceMergeAction implements LifecycleAction { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field(MAX_NUM_SEGMENTS_FIELD.getPreferredName(), maxNumSegments); builder.endObject(); return builder; } + /** + * Helper method to check if a force-merge is necessary based on {@code maxNumSegments} and then calls + * the next {@code action}. + * + * @param index The specific index to check the segments of + * @param client The client to execute the transport actions + * @param listener The listener to call onFailure on if an exception occurs when executing the {@link IndicesSegmentsRequest} + * @param nextAction The next action to execute if there are too many segments and force-merge is appropriate + * @param skipToAction The next action to execute if there aren't too many segments + */ + void checkSegments(Index index, Client client, Listener listener, Consumer nextAction, + Consumer skipToAction) { + client.admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(r -> { + boolean hasTooManySegments = StreamSupport.stream(r.getIndices().get(index.getName()).spliterator(), false) + .anyMatch(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)); + if (nextAction != null && hasTooManySegments && RestStatus.OK.equals(r.getStatus())) { + nextAction.accept(r); + } else { + skipToAction.accept(r); + } + }, listener::onFailure)); + + } + + /** + * Helper method to execute the force-merge + * + * @param index The specific index to force-merge + * @param client The client to execute the transport actions + * @param listener The listener to call onFailure on if an exception occurs when executing the {@link ForceMergeRequest} + * @param nextAction The next action to execute if the force-merge is successful + */ + void forceMerge(Index index, Client client, Listener listener, Consumer nextAction) { + ForceMergeRequest forceMergeRequest = new ForceMergeRequest(index.getName()).maxNumSegments(maxNumSegments); + client.admin().indices().forceMerge(forceMergeRequest, ActionListener.wrap(r -> { + if (RestStatus.OK.equals(r.getStatus())) { + nextAction.accept(r); + } + }, listener::onFailure)); + + } + + /** + * Helper method to prepare the index for force-merging by making it read-only + * + * @param index The specific index to set as read-only + * @param client The client to execute the transport actions + * @param listener The listener to call onFailure on if an exception occurs when executing the {@link UpdateSettingsRequest} + * @param nextAction The next action to execute if updating the setting is successful + */ + void updateBlockWriteSettingToReadOnly(Index index, Client client, Listener listener, Consumer nextAction) { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(), index.getName()); + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> { + nextAction.accept(response); + }, listener::onFailure)); + } + + /** + * Helper method to return the index back to read-write mode since force-merging was successful + * + * @param index The specific index to set back as read-write + * @param client The client to execute the transport actions + * @param listener The listener to return a final response to for this {@link ForceMergeAction}. + */ + void updateBlockWriteSettingToReadWrite(Index index, Client client, Listener listener) { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put(IndexMetaData.SETTING_BLOCKS_WRITE, false).build(), index.getName()); + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap( + response -> listener.onSuccess(true), listener::onFailure)); + } + @Override public void execute(Index index, Client client, ClusterService clusterService, Listener listener) { - // NORELEASE: stub - listener.onSuccess(true); + boolean isReadOnly = clusterService.state().metaData().indices().get(index.getName()).getSettings() + .getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false); + if (isReadOnly) { + // index is already read-only, so just check if a force-merge is necessary and set back + // to read-write whether a force-merge is necessary or not. + checkSegments(index, client, listener, r1 -> + forceMerge(index, client, listener, + // after a successful force-merge, return the index to read-write + r2 -> updateBlockWriteSettingToReadWrite(index, client, listener)), + r3 -> updateBlockWriteSettingToReadWrite(index, client, listener)); + } else { + // first check if a force-merge is appropriate + checkSegments(index, client, listener, + // if appropriate, set the index to read-only + r1 -> updateBlockWriteSettingToReadOnly(index, client, listener, + // once the index is read-only, run a force-merge on it + r2 -> forceMerge(index, client, listener, + // after a successful force-merge, return the index to read-write + r3 -> updateBlockWriteSettingToReadWrite(index, client, listener))), + r4 -> { if (isReadOnly) updateBlockWriteSettingToReadWrite(index, client, listener); }); + } } @Override public int hashCode() { - return 1; + return Objects.hash(maxNumSegments); } @Override @@ -73,12 +200,12 @@ public class ForceMergeAction implements LifecycleAction { if (obj.getClass() != getClass()) { return false; } - return true; + ForceMergeAction other = (ForceMergeAction) obj; + return Objects.equals(maxNumSegments, other.maxNumSegments); } @Override public String toString() { return Strings.toString(this); } - } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeActionTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeActionTests.java index ac17d801ca4..4e514183590 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeActionTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ForceMergeActionTests.java @@ -5,11 +5,47 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.segments.IndexSegments; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener; +import org.mockito.InOrder; +import org.mockito.Mockito; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Spliterator; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; public class ForceMergeActionTests extends AbstractSerializingTestCase { @@ -20,11 +56,357 @@ public class ForceMergeActionTests extends AbstractSerializingTestCase instanceReader() { return ForceMergeAction::new; } + + public void testMissingMaxNumSegments() throws IOException { + BytesReference emptyObject = JsonXContent.contentBuilder().startObject().endObject().bytes(); + XContentParser parser = XContentHelper.createParser(null, emptyObject, XContentType.JSON); + Exception e = expectThrows(IllegalArgumentException.class, () -> ForceMergeAction.parse(parser)); + assertThat(e.getMessage(), equalTo("Required [max_num_segments]")); + } + + public void testInvalidNegativeSegmentNumber() { + Exception r = expectThrows(IllegalArgumentException.class, () -> new ForceMergeAction(randomIntBetween(-10, 0))); + assertThat(r.getMessage(), equalTo("[max_num_segments] must be a positive integer")); + } + + public void testExecuteSuccessfully() throws Exception { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + UpdateSettingsResponse updateSettingsResponse = Mockito.mock(UpdateSettingsResponse.class); + IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class); + IndexSegments indexSegments = Mockito.mock(IndexSegments.class); + IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class); + Map indexShards = Collections.singletonMap(0, indexShardSegments); + ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class); + ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne }; + Spliterator iss = indexShards.values().spliterator(); + List segments = Arrays.asList(null, null); + ForceMergeResponse forceMergeResponse = Mockito.mock(ForceMergeResponse.class); + Mockito.when(forceMergeResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments)); + Mockito.when(indexSegments.spliterator()).thenReturn(iss); + Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray); + Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments); + int maxNumSegments = 1; + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(indicesSegmentResponse); + return null; + }).when(indicesClient).segments(any(), any()); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(updateSettingsResponse); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + Mockito.doAnswer(invocationOnMock -> { + ForceMergeRequest request = (ForceMergeRequest) invocationOnMock.getArguments()[0]; + assertThat(request.maxNumSegments(), equalTo(maxNumSegments)); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(forceMergeResponse); + return null; + }).when(indicesClient).forceMerge(any(), any()); + + SetOnce actionCompleted = new SetOnce<>(); + ForceMergeAction action = new ForceMergeAction(maxNumSegments); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(true, actionCompleted.get()); + + Mockito.verify(client, Mockito.atLeast(1)).admin(); + Mockito.verify(adminClient, Mockito.atLeast(1)).indices(); + Mockito.verify(indicesClient, Mockito.atLeast(1)).segments(any(), any()); + Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any()); + Mockito.verify(indicesClient, Mockito.atLeast(1)).forceMerge(any(), any()); + Mockito.verify(clusterService, Mockito.atLeast(1)).state(); + } + + public void testExecuteWhenReadOnlyAlready() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + boolean isReadOnlyAlready = true; + + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_BLOCKS_WRITE, isReadOnlyAlready)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class); + IndexSegments indexSegments = Mockito.mock(IndexSegments.class); + IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class); + Map indexShards = Collections.singletonMap(0, indexShardSegments); + ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class); + ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne }; + Spliterator iss = indexShards.values().spliterator(); + List segments = Arrays.asList(null, null); + + Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments)); + Mockito.when(indexSegments.spliterator()).thenReturn(iss); + Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray); + Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments); + int maxNumSegments = Integer.MAX_VALUE; + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(indicesSegmentResponse); + return null; + }).when(indicesClient).segments(any(), any()); + + + SetOnce actionCompleted = new SetOnce<>(); + ForceMergeAction action = new ForceMergeAction(maxNumSegments); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(null, actionCompleted.get()); + + InOrder inOrder = Mockito.inOrder(clusterService, client, adminClient, indicesClient); + inOrder.verify(clusterService).state(); + inOrder.verify(client).admin(); + inOrder.verify(adminClient).indices(); + inOrder.verify(indicesClient).segments(any(), any()); + inOrder.verify(indicesClient).updateSettings(any(), any()); + Mockito.verify(indicesClient, Mockito.never()).forceMerge(any(), any()); + } + + public void testExecuteWithNoNeedToForceMerge() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + boolean isReadOnlyAlready = false; + + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_BLOCKS_WRITE, isReadOnlyAlready)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class); + IndexSegments indexSegments = Mockito.mock(IndexSegments.class); + IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class); + Map indexShards = Collections.singletonMap(0, indexShardSegments); + ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class); + ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne }; + Spliterator iss = indexShards.values().spliterator(); + List segments = Arrays.asList(null, null); + + Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments)); + Mockito.when(indexSegments.spliterator()).thenReturn(iss); + Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray); + Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments); + int maxNumSegments = Integer.MAX_VALUE; + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(indicesSegmentResponse); + return null; + }).when(indicesClient).segments(any(), any()); + + + SetOnce actionCompleted = new SetOnce<>(); + ForceMergeAction action = new ForceMergeAction(maxNumSegments); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(null, actionCompleted.get()); + + InOrder inOrder = Mockito.inOrder(clusterService, client, adminClient, indicesClient); + inOrder.verify(clusterService).state(); + inOrder.verify(client).admin(); + inOrder.verify(adminClient).indices(); + inOrder.verify(indicesClient).segments(any(), any()); + Mockito.verify(indicesClient, Mockito.never()).updateSettings(any(), any()); + Mockito.verify(indicesClient, Mockito.never()).forceMerge(any(), any()); + } + + public void testCheckSegmentsNeedsMerging() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class); + IndexSegments indexSegments = Mockito.mock(IndexSegments.class); + IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class); + Map indexShards = Collections.singletonMap(0, indexShardSegments); + ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class); + ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne }; + Spliterator iss = indexShards.values().spliterator(); + List segments = Arrays.asList(null, null); + Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments)); + Mockito.when(indexSegments.spliterator()).thenReturn(iss); + Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray); + Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments); + int maxNumSegments = 1; + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(indicesSegmentResponse); + return null; + }).when(indicesClient).segments(any(), any()); + + SetOnce nextActionCalled = new SetOnce<>(); + ForceMergeAction action = new ForceMergeAction(maxNumSegments); + action.checkSegments(index, client, new Listener() { + + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call to onSuccess"); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }, r -> nextActionCalled.set(true), r2 -> {throw new AssertionError("unexpected call to action");}); + + assertEquals(true, nextActionCalled.get()); + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).segments(any(), any()); + } + + public void testCheckSegmentsNoNeedToMerge() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class); + IndexSegments indexSegments = Mockito.mock(IndexSegments.class); + IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class); + Map indexShards = Collections.singletonMap(0, indexShardSegments); + ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class); + ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne }; + Spliterator iss = indexShards.values().spliterator(); + List segments = Arrays.asList(null, null); + Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK); + Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments)); + Mockito.when(indexSegments.spliterator()).thenReturn(iss); + Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray); + Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments); + int maxNumSegments = randomIntBetween(2, Integer.MAX_VALUE); + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(indicesSegmentResponse); + return null; + }).when(indicesClient).segments(any(), any()); + + SetOnce skipActionCalled = new SetOnce<>(); + ForceMergeAction action = new ForceMergeAction(maxNumSegments); + action.checkSegments(index, client, new Listener() { + + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call to onSuccess"); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }, r -> { throw new AssertionError("next action should not be called"); }, + r2 -> skipActionCalled.set(true)); + + assertTrue(skipActionCalled.get()); + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).segments(any(), any()); + } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java index c64dea8b9f7..3570cbdac54 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java @@ -26,6 +26,7 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.client.Requests.clusterHealthRequest; @@ -88,9 +89,14 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase { public void init() { settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.lifecycle.name", "test").build(); + Map phases = new HashMap<>(); + + Map warmPhaseActions = Collections.singletonMap(ForceMergeAction.NAME, + new ForceMergeAction(-1)); + phases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(2), warmPhaseActions)); + Map deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); - Map phases = Collections.singletonMap("delete", new Phase("delete", - TimeValue.timeValueSeconds(3), deletePhaseActions)); + phases.put("delete", new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions)); lifecyclePolicy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, "test", phases); } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java index a15067dc627..5654ff78558 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java @@ -30,7 +30,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction(); private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction(); - private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(); + private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(-1); private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1); private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction("", new ByteSizeValue(1), null, null); private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction();