Adds ForceMerge action to Index Lifecycle (#3309)

add forcemerge index-lifecycle action
This commit is contained in:
Tal Levy 2017-12-21 11:59:44 -08:00 committed by GitHub
parent d5609a408b
commit e433b1eaa8
4 changed files with 528 additions and 13 deletions

View File

@ -6,40 +6,75 @@
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
/**
* A {@link LifecycleAction} which force-merges the index's shards to a specific number of segments.
* A {@link LifecycleAction} which force-merges the index.
*/
public class ForceMergeAction implements LifecycleAction {
public static final String NAME = "forcemerge";
public static final ParseField MAX_NUM_SEGMENTS_FIELD = new ParseField("max_num_segments");
private static final Logger logger = ESLoggerFactory.getLogger(ForceMergeAction.class);
private static final ObjectParser<ForceMergeAction, Void> PARSER = new ObjectParser<>(NAME, ForceMergeAction::new);
private static final ConstructingObjectParser<ForceMergeAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
false, a -> {
int maxNumSegments = (Integer) a[0];
return new ForceMergeAction(maxNumSegments);
});
static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_NUM_SEGMENTS_FIELD);
}
private final Integer maxNumSegments;
public static ForceMergeAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}
public ForceMergeAction() {
public ForceMergeAction(int maxNumSegments) {
if (maxNumSegments <= 0) {
throw new IllegalArgumentException("[" + MAX_NUM_SEGMENTS_FIELD.getPreferredName()
+ "] must be a positive integer");
}
this.maxNumSegments = maxNumSegments;
}
public ForceMergeAction(StreamInput in) throws IOException {
this.maxNumSegments = in.readVInt();
}
public int getMaxNumSegments() {
return maxNumSegments;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxNumSegments);
}
@Override
@ -50,19 +85,111 @@ public class ForceMergeAction implements LifecycleAction {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MAX_NUM_SEGMENTS_FIELD.getPreferredName(), maxNumSegments);
builder.endObject();
return builder;
}
/**
* Helper method to check if a force-merge is necessary based on {@code maxNumSegments} and then calls
* the next {@code action}.
*
* @param index The specific index to check the segments of
* @param client The client to execute the transport actions
* @param listener The listener to call onFailure on if an exception occurs when executing the {@link IndicesSegmentsRequest}
* @param nextAction The next action to execute if there are too many segments and force-merge is appropriate
* @param skipToAction The next action to execute if there aren't too many segments
*/
void checkSegments(Index index, Client client, Listener listener, Consumer<ActionResponse> nextAction,
Consumer<ActionResponse> skipToAction) {
client.admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(r -> {
boolean hasTooManySegments = StreamSupport.stream(r.getIndices().get(index.getName()).spliterator(), false)
.anyMatch(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments));
if (nextAction != null && hasTooManySegments && RestStatus.OK.equals(r.getStatus())) {
nextAction.accept(r);
} else {
skipToAction.accept(r);
}
}, listener::onFailure));
}
/**
* Helper method to execute the force-merge
*
* @param index The specific index to force-merge
* @param client The client to execute the transport actions
* @param listener The listener to call onFailure on if an exception occurs when executing the {@link ForceMergeRequest}
* @param nextAction The next action to execute if the force-merge is successful
*/
void forceMerge(Index index, Client client, Listener listener, Consumer<ActionResponse> nextAction) {
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(index.getName()).maxNumSegments(maxNumSegments);
client.admin().indices().forceMerge(forceMergeRequest, ActionListener.wrap(r -> {
if (RestStatus.OK.equals(r.getStatus())) {
nextAction.accept(r);
}
}, listener::onFailure));
}
/**
* Helper method to prepare the index for force-merging by making it read-only
*
* @param index The specific index to set as read-only
* @param client The client to execute the transport actions
* @param listener The listener to call onFailure on if an exception occurs when executing the {@link UpdateSettingsRequest}
* @param nextAction The next action to execute if updating the setting is successful
*/
void updateBlockWriteSettingToReadOnly(Index index, Client client, Listener listener, Consumer<ActionResponse> nextAction) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder()
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(), index.getName());
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
nextAction.accept(response);
}, listener::onFailure));
}
/**
* Helper method to return the index back to read-write mode since force-merging was successful
*
* @param index The specific index to set back as read-write
* @param client The client to execute the transport actions
* @param listener The listener to return a final response to for this {@link ForceMergeAction}.
*/
void updateBlockWriteSettingToReadWrite(Index index, Client client, Listener listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder()
.put(IndexMetaData.SETTING_BLOCKS_WRITE, false).build(), index.getName());
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
response -> listener.onSuccess(true), listener::onFailure));
}
@Override
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// NORELEASE: stub
listener.onSuccess(true);
boolean isReadOnly = clusterService.state().metaData().indices().get(index.getName()).getSettings()
.getAsBoolean(IndexMetaData.SETTING_BLOCKS_WRITE, false);
if (isReadOnly) {
// index is already read-only, so just check if a force-merge is necessary and set back
// to read-write whether a force-merge is necessary or not.
checkSegments(index, client, listener, r1 ->
forceMerge(index, client, listener,
// after a successful force-merge, return the index to read-write
r2 -> updateBlockWriteSettingToReadWrite(index, client, listener)),
r3 -> updateBlockWriteSettingToReadWrite(index, client, listener));
} else {
// first check if a force-merge is appropriate
checkSegments(index, client, listener,
// if appropriate, set the index to read-only
r1 -> updateBlockWriteSettingToReadOnly(index, client, listener,
// once the index is read-only, run a force-merge on it
r2 -> forceMerge(index, client, listener,
// after a successful force-merge, return the index to read-write
r3 -> updateBlockWriteSettingToReadWrite(index, client, listener))),
r4 -> { if (isReadOnly) updateBlockWriteSettingToReadWrite(index, client, listener); });
}
}
@Override
public int hashCode() {
return 1;
return Objects.hash(maxNumSegments);
}
@Override
@ -73,12 +200,12 @@ public class ForceMergeAction implements LifecycleAction {
if (obj.getClass() != getClass()) {
return false;
}
return true;
ForceMergeAction other = (ForceMergeAction) obj;
return Objects.equals(maxNumSegments, other.maxNumSegments);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -5,11 +5,47 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener;
import org.mockito.InOrder;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
public class ForceMergeActionTests extends AbstractSerializingTestCase<ForceMergeAction> {
@ -20,11 +56,357 @@ public class ForceMergeActionTests extends AbstractSerializingTestCase<ForceMerg
@Override
protected ForceMergeAction createTestInstance() {
return new ForceMergeAction();
return new ForceMergeAction(randomIntBetween(1, 100));
}
@Override
protected ForceMergeAction mutateInstance(ForceMergeAction instance) {
return new ForceMergeAction(instance.getMaxNumSegments() + randomIntBetween(1, 10));
}
@Override
protected Reader<ForceMergeAction> instanceReader() {
return ForceMergeAction::new;
}
public void testMissingMaxNumSegments() throws IOException {
BytesReference emptyObject = JsonXContent.contentBuilder().startObject().endObject().bytes();
XContentParser parser = XContentHelper.createParser(null, emptyObject, XContentType.JSON);
Exception e = expectThrows(IllegalArgumentException.class, () -> ForceMergeAction.parse(parser));
assertThat(e.getMessage(), equalTo("Required [max_num_segments]"));
}
public void testInvalidNegativeSegmentNumber() {
Exception r = expectThrows(IllegalArgumentException.class, () -> new ForceMergeAction(randomIntBetween(-10, 0)));
assertThat(r.getMessage(), equalTo("[max_num_segments] must be a positive integer"));
}
public void testExecuteSuccessfully() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
UpdateSettingsResponse updateSettingsResponse = Mockito.mock(UpdateSettingsResponse.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = Arrays.asList(null, null);
ForceMergeResponse forceMergeResponse = Mockito.mock(ForceMergeResponse.class);
Mockito.when(forceMergeResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments));
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
int maxNumSegments = 1;
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(updateSettingsResponse);
return null;
}).when(indicesClient).updateSettings(any(), any());
Mockito.doAnswer(invocationOnMock -> {
ForceMergeRequest request = (ForceMergeRequest) invocationOnMock.getArguments()[0];
assertThat(request.maxNumSegments(), equalTo(maxNumSegments));
@SuppressWarnings("unchecked")
ActionListener<ForceMergeResponse> listener = (ActionListener<ForceMergeResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(forceMergeResponse);
return null;
}).when(indicesClient).forceMerge(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ForceMergeAction action = new ForceMergeAction(maxNumSegments);
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(true, actionCompleted.get());
Mockito.verify(client, Mockito.atLeast(1)).admin();
Mockito.verify(adminClient, Mockito.atLeast(1)).indices();
Mockito.verify(indicesClient, Mockito.atLeast(1)).segments(any(), any());
Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any());
Mockito.verify(indicesClient, Mockito.atLeast(1)).forceMerge(any(), any());
Mockito.verify(clusterService, Mockito.atLeast(1)).state();
}
public void testExecuteWhenReadOnlyAlready() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
boolean isReadOnlyAlready = true;
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_BLOCKS_WRITE, isReadOnlyAlready))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = Arrays.asList(null, null);
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments));
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
int maxNumSegments = Integer.MAX_VALUE;
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ForceMergeAction action = new ForceMergeAction(maxNumSegments);
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(null, actionCompleted.get());
InOrder inOrder = Mockito.inOrder(clusterService, client, adminClient, indicesClient);
inOrder.verify(clusterService).state();
inOrder.verify(client).admin();
inOrder.verify(adminClient).indices();
inOrder.verify(indicesClient).segments(any(), any());
inOrder.verify(indicesClient).updateSettings(any(), any());
Mockito.verify(indicesClient, Mockito.never()).forceMerge(any(), any());
}
public void testExecuteWithNoNeedToForceMerge() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
boolean isReadOnlyAlready = false;
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_BLOCKS_WRITE, isReadOnlyAlready))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = Arrays.asList(null, null);
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments));
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
int maxNumSegments = Integer.MAX_VALUE;
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ForceMergeAction action = new ForceMergeAction(maxNumSegments);
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(null, actionCompleted.get());
InOrder inOrder = Mockito.inOrder(clusterService, client, adminClient, indicesClient);
inOrder.verify(clusterService).state();
inOrder.verify(client).admin();
inOrder.verify(adminClient).indices();
inOrder.verify(indicesClient).segments(any(), any());
Mockito.verify(indicesClient, Mockito.never()).updateSettings(any(), any());
Mockito.verify(indicesClient, Mockito.never()).forceMerge(any(), any());
}
public void testCheckSegmentsNeedsMerging() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = Arrays.asList(null, null);
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments));
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
int maxNumSegments = 1;
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());
SetOnce<Boolean> nextActionCalled = new SetOnce<>();
ForceMergeAction action = new ForceMergeAction(maxNumSegments);
action.checkSegments(index, client, new Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call to onSuccess");
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, r -> nextActionCalled.set(true), r2 -> {throw new AssertionError("unexpected call to action");});
assertEquals(true, nextActionCalled.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).segments(any(), any());
}
public void testCheckSegmentsNoNeedToMerge() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = Arrays.asList(null, null);
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), indexSegments));
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
int maxNumSegments = randomIntBetween(2, Integer.MAX_VALUE);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());
SetOnce<Boolean> skipActionCalled = new SetOnce<>();
ForceMergeAction action = new ForceMergeAction(maxNumSegments);
action.checkSegments(index, client, new Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call to onSuccess");
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, r -> { throw new AssertionError("next action should not be called"); },
r2 -> skipActionCalled.set(true));
assertTrue(skipActionCalled.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).segments(any(), any());
}
}

View File

@ -26,6 +26,7 @@ import org.junit.Before;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
@ -88,9 +89,14 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
public void init() {
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.lifecycle.name", "test").build();
Map<String, Phase> phases = new HashMap<>();
Map<String, LifecycleAction> warmPhaseActions = Collections.singletonMap(ForceMergeAction.NAME,
new ForceMergeAction(-1));
phases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(2), warmPhaseActions));
Map<String, LifecycleAction> deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
Map<String, Phase> phases = Collections.singletonMap("delete", new Phase("delete",
TimeValue.timeValueSeconds(3), deletePhaseActions));
phases.put("delete", new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions));
lifecyclePolicy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, "test", phases);
}

View File

@ -30,7 +30,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction();
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction();
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(-1);
private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1);
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction("", new ByteSizeValue(1), null, null);
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction();