[ILM] Add unfollow action (#36970)
This change adds the unfollow action for CCR follower indices. This is needed for the shrink action in case an index is a follower index. This will give the follower index the opportunity to fully catch up with the leader index, pause index following and unfollow the leader index. After this the shrink action can safely perform the ilm shrink. The unfollow action needs to be added to the hot phase and acts as barrier for going to the next phase (warm or delete phases), so that follower indices are being unfollowed properly before indices are expected to go in read-only mode. This allows the force merge action to execute its steps safely. The unfollow action has three steps: * `wait-for-indexing-complete` step: waits for the index in question to get the `index.lifecycle.indexing_complete` setting be set to `true` * `wait-for-follow-shard-tasks` step: waits for all the shard follow tasks for the index being handled to report that the leader shard global checkpoint is equal to the follower shard global checkpoint. * `pause-follower-index` step: Pauses index following, necessary to unfollow * `close-follower-index` step: Closes the index, necessary to unfollow * `unfollow-follower-index` step: Actually unfollows the index using the CCR Unfollow API * `open-follower-index` step: Reopens the index now that it is a normal index * `wait-for-yellow` step: Waits for primary shards to be allocated after reopening the index to ensure the index is ready for the next step In the case of the last two steps, if the index in being handled is a regular index then the steps acts as a no-op. Relates to #34648 Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com> Co-authored-by: Gordon Brown <gordon.brown@elastic.co>
This commit is contained in:
parent
a2bdfb9041
commit
a3030c51e2
|
@ -56,7 +56,10 @@ public class IndexLifecycleNamedXContentProvider implements NamedXContentProvide
|
|||
FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(SetPriorityAction.NAME),
|
||||
SetPriorityAction::parse)
|
||||
SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(UnfollowAction.NAME),
|
||||
UnfollowAction::parse)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,10 +57,10 @@ public class LifecyclePolicy implements ToXContentObject {
|
|||
throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported");
|
||||
}, PHASES_FIELD);
|
||||
|
||||
ALLOWED_ACTIONS.put("hot", Sets.newHashSet(SetPriorityAction.NAME, RolloverAction.NAME));
|
||||
ALLOWED_ACTIONS.put("warm", Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME,
|
||||
ALLOWED_ACTIONS.put("hot", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME));
|
||||
ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME,
|
||||
ReadOnlyAction.NAME, ShrinkAction.NAME));
|
||||
ALLOWED_ACTIONS.put("cold", Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME));
|
||||
ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME));
|
||||
ALLOWED_ACTIONS.put("delete", Sets.newHashSet(DeleteAction.NAME));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class UnfollowAction implements LifecycleAction, ToXContentObject {
|
||||
public static final String NAME = "unfollow";
|
||||
|
||||
private static final ObjectParser<UnfollowAction, Void> PARSER = new ObjectParser<>(NAME, UnfollowAction::new);
|
||||
|
||||
public UnfollowAction() {}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public static UnfollowAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 36970;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.client.indexlifecycle.RolloverAction;
|
|||
import org.elasticsearch.client.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.client.indexlifecycle.StartILMRequest;
|
||||
import org.elasticsearch.client.indexlifecycle.StopILMRequest;
|
||||
import org.elasticsearch.client.indexlifecycle.UnfollowAction;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -144,19 +145,20 @@ public class IndexLifecycleIT extends ESRestHighLevelClientTestCase {
|
|||
|
||||
public void testExplainLifecycle() throws Exception {
|
||||
Map<String, Phase> lifecyclePhases = new HashMap<>();
|
||||
Map<String, LifecycleAction> hotActions = Collections.singletonMap(
|
||||
RolloverAction.NAME,
|
||||
new RolloverAction(null, TimeValue.timeValueHours(50 * 24), null));
|
||||
Map<String, LifecycleAction> hotActions = new HashMap<>();
|
||||
hotActions.put(RolloverAction.NAME, new RolloverAction(null, TimeValue.timeValueHours(50 * 24), null));
|
||||
Phase hotPhase = new Phase("hot", randomFrom(TimeValue.ZERO, null), hotActions);
|
||||
lifecyclePhases.put("hot", hotPhase);
|
||||
|
||||
Map<String, LifecycleAction> warmActions = new HashMap<>();
|
||||
warmActions.put(UnfollowAction.NAME, new UnfollowAction());
|
||||
warmActions.put(AllocateAction.NAME, new AllocateAction(null, null, null, Collections.singletonMap("_name", "node-1")));
|
||||
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
|
||||
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1000));
|
||||
lifecyclePhases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(1000), warmActions));
|
||||
|
||||
Map<String, LifecycleAction> coldActions = new HashMap<>();
|
||||
coldActions.put(UnfollowAction.NAME, new UnfollowAction());
|
||||
coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, null));
|
||||
lifecyclePhases.put("cold", new Phase("cold", TimeValue.timeValueSeconds(2000), coldActions));
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.client.indexlifecycle.ReadOnlyAction;
|
|||
import org.elasticsearch.client.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.client.indexlifecycle.SetPriorityAction;
|
||||
import org.elasticsearch.client.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.client.indexlifecycle.UnfollowAction;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -645,7 +646,7 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
|
||||
public void testProvidedNamedXContents() {
|
||||
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
|
||||
assertEquals(19, namedXContents.size());
|
||||
assertEquals(20, namedXContents.size());
|
||||
Map<Class<?>, Integer> categories = new HashMap<>();
|
||||
List<String> names = new ArrayList<>();
|
||||
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
|
||||
|
@ -669,7 +670,8 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
assertTrue(names.contains(MeanReciprocalRank.NAME));
|
||||
assertTrue(names.contains(DiscountedCumulativeGain.NAME));
|
||||
assertTrue(names.contains(ExpectedReciprocalRank.NAME));
|
||||
assertEquals(Integer.valueOf(8), categories.get(LifecycleAction.class));
|
||||
assertEquals(Integer.valueOf(9), categories.get(LifecycleAction.class));
|
||||
assertTrue(names.contains(UnfollowAction.NAME));
|
||||
assertTrue(names.contains(AllocateAction.NAME));
|
||||
assertTrue(names.contains(DeleteAction.NAME));
|
||||
assertTrue(names.contains(ForceMergeAction.NAME));
|
||||
|
|
|
@ -68,7 +68,8 @@ public class GetLifecyclePolicyResponseTests extends AbstractXContentTestCase<Ge
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
|
|
@ -64,7 +64,8 @@ public class LifecyclePolicyMetadataTests extends AbstractXContentTestCase<Lifec
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
|
|
@ -39,10 +39,11 @@ import java.util.stream.Collectors;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class LifecyclePolicyTests extends AbstractXContentTestCase<LifecyclePolicy> {
|
||||
private static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, RolloverAction.NAME);
|
||||
private static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME,
|
||||
private static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME);
|
||||
private static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME,
|
||||
ForceMergeAction.NAME, ReadOnlyAction.NAME, ShrinkAction.NAME);
|
||||
private static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME);
|
||||
private static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME);
|
||||
private static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(DeleteAction.NAME);
|
||||
|
||||
private String lifecycleName;
|
||||
|
@ -68,7 +69,8 @@ public class LifecyclePolicyTests extends AbstractXContentTestCase<LifecyclePoli
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
@ -213,6 +215,8 @@ public class LifecyclePolicyTests extends AbstractXContentTestCase<LifecyclePoli
|
|||
return new FreezeAction();
|
||||
case SetPriorityAction.NAME:
|
||||
return SetPriorityActionTests.randomInstance();
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action [" + action + "]");
|
||||
}};
|
||||
|
@ -246,6 +250,8 @@ public class LifecyclePolicyTests extends AbstractXContentTestCase<LifecyclePoli
|
|||
return new FreezeAction();
|
||||
case SetPriorityAction.NAME:
|
||||
return SetPriorityActionTests.randomInstance();
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported phase action [" + actionName + "]");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class UnfollowActionTests extends AbstractXContentTestCase<UnfollowAction> {
|
||||
|
||||
@Override
|
||||
protected UnfollowAction createTestInstance() {
|
||||
return new UnfollowAction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UnfollowAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return UnfollowAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -87,16 +87,19 @@ The below list shows the actions which are available in each phase.
|
|||
* Hot
|
||||
- <<ilm-set-priority-action,Set Priority>>
|
||||
- <<ilm-rollover-action,Rollover>>
|
||||
- <<ilm-unfollow-action,Unfollow>>
|
||||
* Warm
|
||||
- <<ilm-set-priority-action,Set Priority>>
|
||||
- <<ilm-allocate-action,Allocate>>
|
||||
- <<ilm-readonly-action,Read-Only>>
|
||||
- <<ilm-forcemerge-action,Force Merge>>
|
||||
- <<ilm-shrink-action,Shrink>>
|
||||
- <<ilm-unfollow-action,Unfollow>>
|
||||
* Cold
|
||||
- <<ilm-set-priority-action,Set Priority>>
|
||||
- <<ilm-allocate-action,Allocate>>
|
||||
- <<ilm-freeze-action,Freeze>>
|
||||
- <<ilm-unfollow-action,Unfollow>>
|
||||
* Delete
|
||||
- <<ilm-delete-action,Delete>>
|
||||
|
||||
|
@ -616,6 +619,43 @@ PUT _ilm/policy/my_policy
|
|||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
[[ilm-unfollow-action]]
|
||||
==== Unfollow
|
||||
|
||||
This action turns a {ref}/ccr-apis.html[ccr] follower index
|
||||
into a regular index. This can be desired when moving follower
|
||||
indices into the next phase. Also certain actions like shrink
|
||||
and rollover can then be performed safely on follower indices.
|
||||
|
||||
If the unfollow action encounters a follower index then
|
||||
the following operations will be performed on it:
|
||||
|
||||
* Pauses indexing following for the follower index.
|
||||
* Closes the follower index.
|
||||
* Unfollows the follower index.
|
||||
* Opens the follower index (which is at this point is a regular index).
|
||||
|
||||
The unfollow action does not have any options and
|
||||
if it encounters a non follower index, then the
|
||||
unfollow action leaves that index untouched and
|
||||
lets the next action operate on this index.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
PUT _ilm/policy/my_policy
|
||||
{
|
||||
"policy": {
|
||||
"phases": {
|
||||
"hot": {
|
||||
"actions": {
|
||||
"unfollow" : {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
=== Full Policy
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
|
@ -429,7 +430,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new)
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
|
||||
abstract class AbstractUnfollowIndexStep extends AsyncActionStep {
|
||||
|
||||
AbstractUnfollowIndexStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
|
||||
String followerIndex = indexMetaData.getIndex().getName();
|
||||
Map<String, String> customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY);
|
||||
if (customIndexMetadata == null) {
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
|
||||
innerPerformAction(followerIndex, listener);
|
||||
}
|
||||
|
||||
abstract void innerPerformAction(String followerIndex, Listener listener);
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
||||
final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep {
|
||||
|
||||
static final String NAME = "close-follower-index";
|
||||
|
||||
CloseFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
void innerPerformAction(String followerIndex, Listener listener) {
|
||||
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex);
|
||||
getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap(
|
||||
r -> {
|
||||
assert r.isAcknowledged() : "close index response is not acknowledged";
|
||||
listener.onResponse(true);
|
||||
},
|
||||
listener::onFailure)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
||||
final class OpenFollowerIndexStep extends AsyncActionStep {
|
||||
|
||||
static final String NAME = "open-follower-index";
|
||||
|
||||
OpenFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
|
||||
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName());
|
||||
getClient().admin().indices().open(request, ActionListener.wrap(
|
||||
r -> {
|
||||
assert r.isAcknowledged() : "open index response is not acknowledged";
|
||||
listener.onResponse(true);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
||||
final class PauseFollowerIndexStep extends AbstractUnfollowIndexStep {
|
||||
|
||||
static final String NAME = "pause-follower-index";
|
||||
|
||||
PauseFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
void innerPerformAction(String followerIndex, Listener listener) {
|
||||
PauseFollowAction.Request request = new PauseFollowAction.Request(followerIndex);
|
||||
getClient().execute(PauseFollowAction.INSTANCE, request, ActionListener.wrap(
|
||||
r -> {
|
||||
assert r.isAcknowledged() : "pause follow response is not acknowledged";
|
||||
listener.onResponse(true);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
}
|
|
@ -34,10 +34,11 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
|
||||
public static final String TYPE = "timeseries";
|
||||
static final List<String> VALID_PHASES = Arrays.asList("hot", "warm", "cold", "delete");
|
||||
static final List<String> ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, RolloverAction.NAME);
|
||||
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME,
|
||||
ShrinkAction.NAME, ForceMergeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME);
|
||||
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
|
||||
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME);
|
||||
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
|
||||
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Converts a CCR following index into a normal, standalone index, once the index is ready to be safely separated.
|
||||
*
|
||||
* "Readiness" is composed of two conditions:
|
||||
* 1) The index must have {@link LifecycleSettings#LIFECYCLE_INDEXING_COMPLETE} set to {@code true}, which is
|
||||
* done automatically by {@link RolloverAction} (or manually).
|
||||
* 2) The index must be up to date with the leader, defined as the follower checkpoint being
|
||||
* equal to the global checkpoint for all shards.
|
||||
*/
|
||||
public final class UnfollowAction implements LifecycleAction {
|
||||
|
||||
public static final String NAME = "unfollow";
|
||||
public static final String CCR_METADATA_KEY = "ccr";
|
||||
|
||||
public UnfollowAction() {}
|
||||
|
||||
@Override
|
||||
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
|
||||
StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME);
|
||||
StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME);
|
||||
StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME);
|
||||
StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME);
|
||||
StepKey unfollowFollowerIndex = new StepKey(phase, NAME, UnfollowFollowIndexStep.NAME);
|
||||
StepKey openFollowerIndex = new StepKey(phase, NAME, OpenFollowerIndexStep.NAME);
|
||||
StepKey waitForYellowStep = new StepKey(phase, NAME, WaitForYellowStep.NAME);
|
||||
|
||||
WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks);
|
||||
WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
|
||||
PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
|
||||
CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
|
||||
UnfollowFollowIndexStep step5 = new UnfollowFollowIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
|
||||
OpenFollowerIndexStep step6 = new OpenFollowerIndexStep(openFollowerIndex, waitForYellowStep, client);
|
||||
WaitForYellowStep step7 = new WaitForYellowStep(waitForYellowStep, nextStepKey);
|
||||
return Arrays.asList(step1, step2, step3, step4, step5, step6, step7);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StepKey> toStepKeys(String phase) {
|
||||
StepKey indexingCompleteStep = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME);
|
||||
StepKey waitForFollowShardTasksStep = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME);
|
||||
StepKey pauseFollowerIndexStep = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME);
|
||||
StepKey closeFollowerIndexStep = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME);
|
||||
StepKey unfollowIndexStep = new StepKey(phase, NAME, UnfollowFollowIndexStep.NAME);
|
||||
StepKey openFollowerIndexStep = new StepKey(phase, NAME, OpenFollowerIndexStep.NAME);
|
||||
StepKey waitForYellowStep = new StepKey(phase, NAME, WaitForYellowStep.NAME);
|
||||
return Arrays.asList(indexingCompleteStep, waitForFollowShardTasksStep, pauseFollowerIndexStep,
|
||||
closeFollowerIndexStep, unfollowIndexStep, openFollowerIndexStep, waitForYellowStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSafeAction() {
|
||||
// There are no settings to change, so therefor this action should be safe:
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public UnfollowAction(StreamInput in) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {}
|
||||
|
||||
private static final ObjectParser<UnfollowAction, Void> PARSER = new ObjectParser<>(NAME, UnfollowAction::new);
|
||||
|
||||
public static UnfollowAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 36970;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
||||
|
||||
final class UnfollowFollowIndexStep extends AbstractUnfollowIndexStep {
|
||||
|
||||
static final String NAME = "unfollow-follower-index";
|
||||
|
||||
UnfollowFollowIndexStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
void innerPerformAction(String followerIndex, Listener listener) {
|
||||
UnfollowAction.Request request = new UnfollowAction.Request(followerIndex);
|
||||
getClient().execute(UnfollowAction.INSTANCE, request, ActionListener.wrap(
|
||||
r -> {
|
||||
assert r.isAcknowledged() : "unfollow response is not acknowledged";
|
||||
listener.onResponse(true);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
|
||||
final class WaitForFollowShardTasksStep extends AsyncWaitStep {
|
||||
|
||||
static final String NAME = "wait-for-follow-shard-tasks";
|
||||
|
||||
WaitForFollowShardTasksStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
|
||||
Map<String, String> customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY);
|
||||
if (customIndexMetadata == null) {
|
||||
listener.onResponse(true, null);
|
||||
return;
|
||||
}
|
||||
|
||||
FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
request.setIndices(new String[]{indexMetaData.getIndex().getName()});
|
||||
getClient().execute(FollowStatsAction.INSTANCE, request,
|
||||
ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure));
|
||||
}
|
||||
|
||||
void handleResponse(FollowStatsAction.StatsResponses responses, Listener listener) {
|
||||
List<ShardFollowNodeTaskStatus> unSyncedShardFollowStatuses = responses.getStatsResponses()
|
||||
.stream()
|
||||
.map(FollowStatsAction.StatsResponse::status)
|
||||
.filter(shardFollowStatus -> shardFollowStatus.leaderGlobalCheckpoint() != shardFollowStatus.followerGlobalCheckpoint())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Follow stats api needs to return stats for follower index and all shard follow tasks should be synced:
|
||||
boolean conditionMet = responses.getStatsResponses().size() > 0 && unSyncedShardFollowStatuses.isEmpty();
|
||||
if (conditionMet) {
|
||||
listener.onResponse(true, null);
|
||||
} else {
|
||||
List<Info.ShardFollowTaskInfo> shardFollowTaskInfos = unSyncedShardFollowStatuses
|
||||
.stream()
|
||||
.map(status -> new Info.ShardFollowTaskInfo(status.followerIndex(), status.getShardId(),
|
||||
status.leaderGlobalCheckpoint(), status.followerGlobalCheckpoint()))
|
||||
.collect(Collectors.toList());
|
||||
listener.onResponse(false, new Info(shardFollowTaskInfos));
|
||||
}
|
||||
}
|
||||
|
||||
static final class Info implements ToXContentObject {
|
||||
|
||||
static final ParseField SHARD_FOLLOW_TASKS = new ParseField("shard_follow_tasks");
|
||||
static final ParseField MESSAGE = new ParseField("message");
|
||||
|
||||
private final List<ShardFollowTaskInfo> shardFollowTaskInfos;
|
||||
|
||||
Info(List<ShardFollowTaskInfo> shardFollowTaskInfos) {
|
||||
this.shardFollowTaskInfos = shardFollowTaskInfos;
|
||||
}
|
||||
|
||||
List<ShardFollowTaskInfo> getShardFollowTaskInfos() {
|
||||
return shardFollowTaskInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(SHARD_FOLLOW_TASKS.getPreferredName(), shardFollowTaskInfos);
|
||||
String message;
|
||||
if (shardFollowTaskInfos.size() > 0) {
|
||||
message = "Waiting for [" + shardFollowTaskInfos.size() + "] shard follow tasks to be in sync";
|
||||
} else {
|
||||
message = "Waiting for following to be unpaused and all shard follow tasks to be up to date";
|
||||
}
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Info info = (Info) o;
|
||||
return Objects.equals(shardFollowTaskInfos, info.shardFollowTaskInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(shardFollowTaskInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
static final class ShardFollowTaskInfo implements ToXContentObject {
|
||||
|
||||
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
|
||||
static final ParseField SHARD_ID_FIELD = new ParseField("shard_id");
|
||||
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
|
||||
static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint");
|
||||
|
||||
private final String followerIndex;
|
||||
private final int shardId;
|
||||
private final long leaderGlobalCheckpoint;
|
||||
private final long followerGlobalCheckpoint;
|
||||
|
||||
ShardFollowTaskInfo(String followerIndex, int shardId, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
|
||||
this.followerIndex = followerIndex;
|
||||
this.shardId = shardId;
|
||||
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
|
||||
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
|
||||
}
|
||||
|
||||
String getFollowerIndex() {
|
||||
return followerIndex;
|
||||
}
|
||||
|
||||
|
||||
int getShardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
long getLeaderGlobalCheckpoint() {
|
||||
return leaderGlobalCheckpoint;
|
||||
}
|
||||
|
||||
long getFollowerGlobalCheckpoint() {
|
||||
return followerGlobalCheckpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
|
||||
builder.field(SHARD_ID_FIELD.getPreferredName(), shardId);
|
||||
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
|
||||
builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ShardFollowTaskInfo that = (ShardFollowTaskInfo) o;
|
||||
return shardId == that.shardId &&
|
||||
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
|
||||
followerGlobalCheckpoint == that.followerGlobalCheckpoint &&
|
||||
Objects.equals(followerIndex, that.followerIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(followerIndex, shardId, leaderGlobalCheckpoint, followerGlobalCheckpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
|
||||
final class WaitForIndexingCompleteStep extends ClusterStateWaitStep {
|
||||
private static final Logger logger = LogManager.getLogger(WaitForIndexingCompleteStep.class);
|
||||
|
||||
static final String NAME = "wait-for-indexing-complete";
|
||||
|
||||
WaitForIndexingCompleteStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
IndexMetaData followerIndex = clusterState.metaData().index(index);
|
||||
if (followerIndex == null) {
|
||||
// Index must have been since deleted, ignore it
|
||||
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName());
|
||||
return new Result(false, null);
|
||||
}
|
||||
Map<String, String> customIndexMetadata = followerIndex.getCustomData(CCR_METADATA_KEY);
|
||||
if (customIndexMetadata == null) {
|
||||
return new Result(true, null);
|
||||
}
|
||||
|
||||
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(followerIndex.getSettings());
|
||||
if (indexingComplete) {
|
||||
return new Result(true, null);
|
||||
} else {
|
||||
return new Result(false, new IndexingNotCompleteInfo());
|
||||
}
|
||||
}
|
||||
|
||||
static final class IndexingNotCompleteInfo implements ToXContentObject {
|
||||
|
||||
static final ParseField MESSAGE_FIELD = new ParseField("message");
|
||||
static final ParseField INDEXING_COMPLETE = new ParseField(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE);
|
||||
|
||||
private final String message;
|
||||
|
||||
IndexingNotCompleteInfo() {
|
||||
this.message = "waiting for the [" + LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE +
|
||||
"] setting to be set to true on the leader index, it is currently [false]";
|
||||
}
|
||||
|
||||
String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE_FIELD.getPreferredName(), message);
|
||||
builder.field(INDEXING_COMPLETE.getPreferredName(), false);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
IndexingNotCompleteInfo info = (IndexingNotCompleteInfo) o;
|
||||
return Objects.equals(getMessage(), info.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
class WaitForYellowStep extends ClusterStateWaitStep {
|
||||
|
||||
static final String NAME = "wait-for-yellow-step";
|
||||
|
||||
WaitForYellowStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
RoutingTable routingTable = clusterState.routingTable();
|
||||
IndexRoutingTable indexShardRoutingTable = routingTable.index(index);
|
||||
if (indexShardRoutingTable == null) {
|
||||
return new Result(false, new Info("index is red; no IndexRoutingTable"));
|
||||
}
|
||||
|
||||
boolean indexIsAtLeastYellow = indexShardRoutingTable.allPrimaryShardsActive();
|
||||
if (indexIsAtLeastYellow) {
|
||||
return new Result(true, null);
|
||||
} else {
|
||||
return new Result(false, new Info("index is red; not all primary shards are active"));
|
||||
}
|
||||
}
|
||||
|
||||
static final class Info implements ToXContentObject {
|
||||
|
||||
static final ParseField MESSAGE_FIELD = new ParseField("message");
|
||||
|
||||
private final String message;
|
||||
|
||||
Info(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE_FIELD.getPreferredName(), message);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Info info = (Info) o;
|
||||
return Objects.equals(getMessage(), info.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public abstract class AbstractUnfollowIndexStepTestCase<T extends AbstractUnfollowIndexStep> extends AbstractStepTestCase<T> {
|
||||
|
||||
@Override
|
||||
protected final T createRandomInstance() {
|
||||
Step.StepKey stepKey = randomStepKey();
|
||||
Step.StepKey nextStepKey = randomStepKey();
|
||||
return newInstance(stepKey, nextStepKey, Mockito.mock(Client.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final T mutateInstance(T instance) {
|
||||
Step.StepKey key = instance.getKey();
|
||||
Step.StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
if (randomBoolean()) {
|
||||
key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
} else {
|
||||
nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
return newInstance(key, nextKey, instance.getClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final T copyInstance(T instance) {
|
||||
return newInstance(instance.getKey(), instance.getNextStepKey(), instance.getClient());
|
||||
}
|
||||
|
||||
public final void testNotAFollowerIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
T step = newInstance(randomStepKey(), randomStepKey(), client);
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], is(true));
|
||||
assertThat(failure[0], nullValue());
|
||||
Mockito.verifyZeroInteractions(client);
|
||||
}
|
||||
|
||||
protected abstract T newInstance(Step.StepKey key, Step.StepKey nextKey, Client client);
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class CloseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCase<CloseFollowerIndexStep> {
|
||||
|
||||
@Override
|
||||
protected CloseFollowerIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) {
|
||||
return new CloseFollowerIndexStep(key, nextKey, client);
|
||||
}
|
||||
|
||||
public void testCloseFollowingIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Mockito.doAnswer(invocation -> {
|
||||
CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0];
|
||||
assertThat(closeIndexRequest.indices()[0], equalTo("follower-index"));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[1];
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
return null;
|
||||
}).when(indicesClient).close(Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], is(true));
|
||||
assertThat(failure[0], nullValue());
|
||||
}
|
||||
|
||||
public void testCloseFollowingIndexFailed() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
// Mock pause follow api call:
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Exception error = new RuntimeException();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0];
|
||||
assertThat(closeIndexRequest.indices()[0], equalTo("follower-index"));
|
||||
ActionListener listener = (ActionListener) invocation.getArguments()[1];
|
||||
listener.onFailure(error);
|
||||
return null;
|
||||
}).when(indicesClient).close(Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], nullValue());
|
||||
assertThat(failure[0], sameInstance(error));
|
||||
Mockito.verify(indicesClient).close(Mockito.any(), Mockito.any());
|
||||
Mockito.verifyNoMoreInteractions(indicesClient);
|
||||
}
|
||||
}
|
|
@ -45,7 +45,8 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new)
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -62,7 +63,8 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
|
|
@ -54,7 +54,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new)
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -71,7 +72,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
@ -120,6 +122,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return new FreezeAction();
|
||||
case SetPriorityAction.NAME:
|
||||
return SetPriorityActionTests.randomInstance();
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action [" + action + "]");
|
||||
}};
|
||||
|
@ -170,6 +174,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return new FreezeAction();
|
||||
case SetPriorityAction.NAME:
|
||||
return SetPriorityActionTests.randomInstance();
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action [" + action + "]");
|
||||
}};
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class OpenFollowerIndexStepTests extends AbstractStepTestCase<OpenFollowerIndexStep> {
|
||||
|
||||
@Override
|
||||
protected OpenFollowerIndexStep createRandomInstance() {
|
||||
Step.StepKey stepKey = randomStepKey();
|
||||
Step.StepKey nextStepKey = randomStepKey();
|
||||
return new OpenFollowerIndexStep(stepKey, nextStepKey, Mockito.mock(Client.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OpenFollowerIndexStep mutateInstance(OpenFollowerIndexStep instance) {
|
||||
Step.StepKey key = instance.getKey();
|
||||
Step.StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
if (randomBoolean()) {
|
||||
key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
} else {
|
||||
nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
return new OpenFollowerIndexStep(key, nextKey, instance.getClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OpenFollowerIndexStep copyInstance(OpenFollowerIndexStep instance) {
|
||||
return new OpenFollowerIndexStep(instance.getKey(), instance.getNextStepKey(), instance.getClient());
|
||||
}
|
||||
|
||||
public void testOpenFollowingIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Mockito.doAnswer(invocation -> {
|
||||
OpenIndexRequest closeIndexRequest = (OpenIndexRequest) invocation.getArguments()[0];
|
||||
assertThat(closeIndexRequest.indices()[0], equalTo("follower-index"));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<OpenIndexResponse> listener = (ActionListener<OpenIndexResponse>) invocation.getArguments()[1];
|
||||
listener.onResponse(new OpenIndexResponse(true, true));
|
||||
return null;
|
||||
}).when(indicesClient).open(Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
OpenFollowerIndexStep step = new OpenFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], is(true));
|
||||
assertThat(failure[0], nullValue());
|
||||
}
|
||||
|
||||
public void testOpenFollowingIndexFailed() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Exception error = new RuntimeException();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
OpenIndexRequest closeIndexRequest = (OpenIndexRequest) invocation.getArguments()[0];
|
||||
assertThat(closeIndexRequest.indices()[0], equalTo("follower-index"));
|
||||
ActionListener listener = (ActionListener) invocation.getArguments()[1];
|
||||
listener.onFailure(error);
|
||||
return null;
|
||||
}).when(indicesClient).open(Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
OpenFollowerIndexStep step = new OpenFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], nullValue());
|
||||
assertThat(failure[0], sameInstance(error));
|
||||
Mockito.verify(indicesClient).open(Mockito.any(), Mockito.any());
|
||||
Mockito.verifyNoMoreInteractions(indicesClient);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class PauseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCase<PauseFollowerIndexStep> {
|
||||
|
||||
@Override
|
||||
protected PauseFollowerIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) {
|
||||
return new PauseFollowerIndexStep(key, nextKey, client);
|
||||
}
|
||||
|
||||
public void testPauseFollowingIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Mockito.doAnswer(invocation -> {
|
||||
PauseFollowAction.Request request = (PauseFollowAction.Request) invocation.getArguments()[1];
|
||||
assertThat(request.getFollowIndex(), equalTo("follower-index"));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
return null;
|
||||
}).when(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
PauseFollowerIndexStep step = new PauseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], is(true));
|
||||
assertThat(failure[0], nullValue());
|
||||
}
|
||||
|
||||
public void testPauseFollowingIndexFailed() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
// Mock pause follow api call:
|
||||
Client client = Mockito.mock(Client.class);
|
||||
Exception error = new RuntimeException();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
PauseFollowAction.Request request = (PauseFollowAction.Request) invocation.getArguments()[1];
|
||||
assertThat(request.getFollowIndex(), equalTo("follower-index"));
|
||||
ActionListener listener = (ActionListener) invocation.getArguments()[2];
|
||||
listener.onFailure(error);
|
||||
return null;
|
||||
}).when(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
PauseFollowerIndexStep step = new PauseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], nullValue());
|
||||
assertThat(failure[0], sameInstance(error));
|
||||
Mockito.verify(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
Mockito.verifyNoMoreInteractions(client);
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
private static final ReadOnlyAction TEST_READ_ONLY_ACTION = new ReadOnlyAction();
|
||||
private static final FreezeAction TEST_FREEZE_ACTION = new FreezeAction();
|
||||
private static final SetPriorityAction TEST_PRIORITY_ACTION = new SetPriorityAction(0);
|
||||
private static final UnfollowAction TEST_UNFOLLOW_ACTION = new UnfollowAction();
|
||||
|
||||
public void testValidatePhases() {
|
||||
boolean invalid = randomBoolean();
|
||||
|
@ -305,10 +306,14 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
|
||||
public void testGetNextActionName() {
|
||||
// Hot Phase
|
||||
assertNextActionName("hot", SetPriorityAction.NAME, UnfollowAction.NAME,
|
||||
new String[] {UnfollowAction.NAME, RolloverAction.NAME});
|
||||
assertNextActionName("hot", SetPriorityAction.NAME, RolloverAction.NAME, new String[]{RolloverAction.NAME});
|
||||
assertNextActionName("hot", SetPriorityAction.NAME, null, new String[] {});
|
||||
assertNextActionName("hot", SetPriorityAction.NAME, RolloverAction.NAME, new String[]{SetPriorityAction.NAME, RolloverAction.NAME});
|
||||
|
||||
assertNextActionName("hot", RolloverAction.NAME, null, new String[] {});
|
||||
assertNextActionName("hot", RolloverAction.NAME, null, new String[] { RolloverAction.NAME });
|
||||
|
||||
assertInvalidAction("hot", "foo", new String[] { RolloverAction.NAME });
|
||||
assertInvalidAction("hot", AllocateAction.NAME, new String[] { RolloverAction.NAME });
|
||||
assertInvalidAction("hot", DeleteAction.NAME, new String[] { RolloverAction.NAME });
|
||||
|
@ -317,6 +322,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
assertInvalidAction("hot", ShrinkAction.NAME, new String[] { RolloverAction.NAME });
|
||||
|
||||
// Warm Phase
|
||||
assertNextActionName("warm", SetPriorityAction.NAME, UnfollowAction.NAME,
|
||||
new String[]{SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME,
|
||||
ShrinkAction.NAME, ForceMergeAction.NAME});
|
||||
assertNextActionName("warm", SetPriorityAction.NAME, ReadOnlyAction.NAME,
|
||||
new String[]{SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME});
|
||||
assertNextActionName("warm", SetPriorityAction.NAME, AllocateAction.NAME,
|
||||
|
@ -327,6 +335,17 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
new String[]{SetPriorityAction.NAME, ForceMergeAction.NAME});
|
||||
assertNextActionName("warm", SetPriorityAction.NAME, null, new String[]{SetPriorityAction.NAME});
|
||||
|
||||
assertNextActionName("warm", UnfollowAction.NAME, ReadOnlyAction.NAME,
|
||||
new String[] { SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", UnfollowAction.NAME, ReadOnlyAction.NAME,
|
||||
new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", UnfollowAction.NAME, AllocateAction.NAME,
|
||||
new String[] { AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", UnfollowAction.NAME, ShrinkAction.NAME,
|
||||
new String[] { ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", UnfollowAction.NAME, ForceMergeAction.NAME, new String[] { ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", UnfollowAction.NAME, null, new String[] {});
|
||||
|
||||
assertNextActionName("warm", ReadOnlyAction.NAME, AllocateAction.NAME,
|
||||
new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
assertNextActionName("warm", ReadOnlyAction.NAME, ShrinkAction.NAME,
|
||||
|
@ -371,15 +390,27 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME });
|
||||
|
||||
// Cold Phase
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, FreezeAction.NAME, new String[]{SetPriorityAction.NAME, FreezeAction.NAME});
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, UnfollowAction.NAME,
|
||||
new String[]{UnfollowAction.NAME, SetPriorityAction.NAME, FreezeAction.NAME});
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, FreezeAction.NAME,
|
||||
new String[]{SetPriorityAction.NAME, FreezeAction.NAME});
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, AllocateAction.NAME,
|
||||
new String[]{SetPriorityAction.NAME, AllocateAction.NAME});
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, null, new String[] { SetPriorityAction.NAME });
|
||||
assertNextActionName("cold", SetPriorityAction.NAME, null, new String[] {});
|
||||
|
||||
assertNextActionName("cold", UnfollowAction.NAME, AllocateAction.NAME,
|
||||
new String[] {SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME});
|
||||
assertNextActionName("cold", UnfollowAction.NAME, AllocateAction.NAME,
|
||||
new String[] {AllocateAction.NAME, FreezeAction.NAME});
|
||||
assertNextActionName("cold", UnfollowAction.NAME, FreezeAction.NAME, new String[] {FreezeAction.NAME});
|
||||
assertNextActionName("cold", UnfollowAction.NAME, null, new String[] {});
|
||||
|
||||
assertNextActionName("cold", AllocateAction.NAME, null, new String[] { AllocateAction.NAME });
|
||||
assertNextActionName("cold", AllocateAction.NAME, null, new String[] {});
|
||||
assertNextActionName("cold", AllocateAction.NAME, null, new String[] {});
|
||||
assertNextActionName("cold", AllocateAction.NAME, FreezeAction.NAME, FreezeAction.NAME);
|
||||
|
||||
assertNextActionName("cold", FreezeAction.NAME, null);
|
||||
assertNextActionName("cold", FreezeAction.NAME, null, AllocateAction.NAME);
|
||||
|
||||
|
@ -393,6 +424,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
// Delete Phase
|
||||
assertNextActionName("delete", DeleteAction.NAME, null, new String[] {});
|
||||
assertNextActionName("delete", DeleteAction.NAME, null, new String[] { DeleteAction.NAME });
|
||||
|
||||
assertInvalidAction("delete", "foo", new String[] { DeleteAction.NAME });
|
||||
assertInvalidAction("delete", AllocateAction.NAME, new String[] { DeleteAction.NAME });
|
||||
assertInvalidAction("delete", ForceMergeAction.NAME, new String[] { DeleteAction.NAME });
|
||||
|
@ -401,6 +433,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
assertInvalidAction("delete", ShrinkAction.NAME, new String[] { DeleteAction.NAME });
|
||||
assertInvalidAction("delete", FreezeAction.NAME, new String[] { DeleteAction.NAME });
|
||||
assertInvalidAction("delete", SetPriorityAction.NAME, new String[] { DeleteAction.NAME });
|
||||
assertInvalidAction("delete", UnfollowAction.NAME, new String[] { DeleteAction.NAME });
|
||||
|
||||
Phase phase = new Phase("foo", TimeValue.ZERO, Collections.emptyMap());
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
|
@ -444,6 +477,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
return new FreezeAction();
|
||||
case SetPriorityAction.NAME:
|
||||
return new SetPriorityAction(0);
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
}
|
||||
return new DeleteAction();
|
||||
}).collect(Collectors.toConcurrentMap(LifecycleAction::getWriteableName, Function.identity()));
|
||||
|
@ -509,6 +544,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
return TEST_FREEZE_ACTION;
|
||||
case SetPriorityAction.NAME:
|
||||
return TEST_PRIORITY_ACTION;
|
||||
case UnfollowAction.NAME:
|
||||
return TEST_UNFOLLOW_ACTION;
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported timeseries phase action [" + actionName + "]");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class UnfollowActionTests extends AbstractActionTestCase<UnfollowAction> {
|
||||
|
||||
@Override
|
||||
protected UnfollowAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return UnfollowAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UnfollowAction createTestInstance() {
|
||||
return new UnfollowAction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<UnfollowAction> instanceReader() {
|
||||
return UnfollowAction::new;
|
||||
}
|
||||
|
||||
public void testToSteps() {
|
||||
UnfollowAction action = createTestInstance();
|
||||
String phase = randomAlphaOfLengthBetween(1, 10);
|
||||
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
||||
randomAlphaOfLengthBetween(1, 10));
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertThat(steps, notNullValue());
|
||||
assertThat(steps.size(), equalTo(7));
|
||||
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME);
|
||||
StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
|
||||
StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
|
||||
StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
|
||||
StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowIndexStep.NAME);
|
||||
StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, OpenFollowerIndexStep.NAME);
|
||||
StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForYellowStep.NAME);
|
||||
|
||||
WaitForIndexingCompleteStep firstStep = (WaitForIndexingCompleteStep) steps.get(0);
|
||||
assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
|
||||
assertThat(firstStep.getNextStepKey(), equalTo(expectedSecondStepKey));
|
||||
|
||||
WaitForFollowShardTasksStep secondStep = (WaitForFollowShardTasksStep) steps.get(1);
|
||||
assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey));
|
||||
assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey));
|
||||
|
||||
PauseFollowerIndexStep thirdStep = (PauseFollowerIndexStep) steps.get(2);
|
||||
assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey));
|
||||
assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey));
|
||||
|
||||
CloseFollowerIndexStep fourthStep = (CloseFollowerIndexStep) steps.get(3);
|
||||
assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey));
|
||||
assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey));
|
||||
|
||||
UnfollowFollowIndexStep fifthStep = (UnfollowFollowIndexStep) steps.get(4);
|
||||
assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey));
|
||||
assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey));
|
||||
|
||||
OpenFollowerIndexStep sixthStep = (OpenFollowerIndexStep) steps.get(5);
|
||||
assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey));
|
||||
assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey));
|
||||
|
||||
WaitForYellowStep seventhStep = (WaitForYellowStep) steps.get(6);
|
||||
assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey));
|
||||
assertThat(seventhStep.getNextStepKey(), equalTo(nextStepKey));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class UnfollowFollowIndexStepTests extends AbstractUnfollowIndexStepTestCase<UnfollowFollowIndexStep> {
|
||||
|
||||
@Override
|
||||
protected UnfollowFollowIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) {
|
||||
return new UnfollowFollowIndexStep(key, nextKey, client);
|
||||
}
|
||||
|
||||
public void testUnFollow() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Mockito.doAnswer(invocation -> {
|
||||
UnfollowAction.Request request = (UnfollowAction.Request) invocation.getArguments()[1];
|
||||
assertThat(request.getFollowerIndex(), equalTo("follower-index"));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
return null;
|
||||
}).when(client).execute(Mockito.same(UnfollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], is(true));
|
||||
assertThat(failure[0], nullValue());
|
||||
}
|
||||
|
||||
public void testUnFollowUnfollowFailed() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
// Mock unfollow api call:
|
||||
Exception error = new RuntimeException();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
UnfollowAction.Request request = (UnfollowAction.Request) invocation.getArguments()[1];
|
||||
assertThat(request.getFollowerIndex(), equalTo("follower-index"));
|
||||
ActionListener listener = (ActionListener) invocation.getArguments()[2];
|
||||
listener.onFailure(error);
|
||||
return null;
|
||||
}).when(client).execute(Mockito.same(UnfollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
|
||||
Boolean[] completed = new Boolean[1];
|
||||
Exception[] failure = new Exception[1];
|
||||
UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed[0] = complete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure[0] = e;
|
||||
}
|
||||
});
|
||||
assertThat(completed[0], nullValue());
|
||||
assertThat(failure[0], sameInstance(error));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.WaitForFollowShardTasksStep.Info;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.WaitForFollowShardTasksStep.Info.ShardFollowTaskInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class WaitForFollowShardTasksStepInfoTests extends AbstractXContentTestCase<Info> {
|
||||
|
||||
private static final ConstructingObjectParser<ShardFollowTaskInfo, Void> SHARD_FOLLOW_TASK_INFO_PARSER =
|
||||
new ConstructingObjectParser<>(
|
||||
"shard_follow_task_info_parser",
|
||||
args -> new ShardFollowTaskInfo((String) args[0], (Integer) args[1], (Long) args[2], (Long) args[3])
|
||||
);
|
||||
|
||||
static {
|
||||
SHARD_FOLLOW_TASK_INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), ShardFollowTaskInfo.FOLLOWER_INDEX_FIELD);
|
||||
SHARD_FOLLOW_TASK_INFO_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTaskInfo.SHARD_ID_FIELD);
|
||||
SHARD_FOLLOW_TASK_INFO_PARSER.declareLong(ConstructingObjectParser.constructorArg(),
|
||||
ShardFollowTaskInfo.LEADER_GLOBAL_CHECKPOINT_FIELD);
|
||||
SHARD_FOLLOW_TASK_INFO_PARSER.declareLong(ConstructingObjectParser.constructorArg(),
|
||||
ShardFollowTaskInfo.FOLLOWER_GLOBAL_CHECKPOINT_FIELD);
|
||||
}
|
||||
|
||||
private static final ConstructingObjectParser<Info, Void> INFO_PARSER = new ConstructingObjectParser<>(
|
||||
"info_parser",
|
||||
args -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
Info info = new Info((List<ShardFollowTaskInfo>) args[0]);
|
||||
return info;
|
||||
}
|
||||
);
|
||||
|
||||
static {
|
||||
INFO_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), SHARD_FOLLOW_TASK_INFO_PARSER,
|
||||
Info.SHARD_FOLLOW_TASKS);
|
||||
INFO_PARSER.declareString((i, s) -> {}, Info.MESSAGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Info createTestInstance() {
|
||||
int numInfos = randomIntBetween(0, 32);
|
||||
List<ShardFollowTaskInfo> shardFollowTaskInfos = new ArrayList<>(numInfos);
|
||||
for (int i = 0; i < numInfos; i++) {
|
||||
shardFollowTaskInfos.add(new ShardFollowTaskInfo(randomAlphaOfLength(3), randomIntBetween(0, 10),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()));
|
||||
}
|
||||
return new Info(shardFollowTaskInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Info doParseInstance(XContentParser parser) throws IOException {
|
||||
return INFO_PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase<WaitForFollowShardTasksStep> {
|
||||
|
||||
@Override
|
||||
protected WaitForFollowShardTasksStep createRandomInstance() {
|
||||
StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
return new WaitForFollowShardTasksStep(stepKey, nextStepKey, Mockito.mock(Client.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForFollowShardTasksStep mutateInstance(WaitForFollowShardTasksStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
if (randomBoolean()) {
|
||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
} else {
|
||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
return new WaitForFollowShardTasksStep(key, nextKey, instance.getClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForFollowShardTasksStep copyInstance(WaitForFollowShardTasksStep instance) {
|
||||
return new WaitForFollowShardTasksStep(instance.getKey(), instance.getNextStepKey(), instance.getClient());
|
||||
}
|
||||
|
||||
public void testConditionMet() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(2)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
Client client = Mockito.mock(Client.class);
|
||||
List<FollowStatsAction.StatsResponse> statsResponses = Arrays.asList(
|
||||
new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(0, 9, 9)),
|
||||
new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(1, 3, 3))
|
||||
);
|
||||
mockFollowStatsCall(client, indexMetadata.getIndex().getName(), statsResponses);
|
||||
|
||||
WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client);
|
||||
final boolean[] conditionMetHolder = new boolean[1];
|
||||
final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
|
||||
final Exception[] exceptionHolder = new Exception[1];
|
||||
step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
|
||||
conditionMetHolder[0] = conditionMet;
|
||||
informationContextHolder[0] = informationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptionHolder[0] = e;
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(conditionMetHolder[0], is(true));
|
||||
assertThat(informationContextHolder[0], nullValue());
|
||||
assertThat(exceptionHolder[0], nullValue());
|
||||
}
|
||||
|
||||
public void testConditionNotMetShardsNotInSync() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(2)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
Client client = Mockito.mock(Client.class);
|
||||
List<FollowStatsAction.StatsResponse> statsResponses = Arrays.asList(
|
||||
new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(0, 9, 9)),
|
||||
new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(1, 8, 3))
|
||||
);
|
||||
mockFollowStatsCall(client, indexMetadata.getIndex().getName(), statsResponses);
|
||||
|
||||
WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client);
|
||||
final boolean[] conditionMetHolder = new boolean[1];
|
||||
final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
|
||||
final Exception[] exceptionHolder = new Exception[1];
|
||||
step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
|
||||
conditionMetHolder[0] = conditionMet;
|
||||
informationContextHolder[0] = informationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptionHolder[0] = e;
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(conditionMetHolder[0], is(false));
|
||||
assertThat(informationContextHolder[0], notNullValue());
|
||||
assertThat(exceptionHolder[0], nullValue());
|
||||
WaitForFollowShardTasksStep.Info info = (WaitForFollowShardTasksStep.Info) informationContextHolder[0];
|
||||
assertThat(info.getShardFollowTaskInfos().size(), equalTo(1));
|
||||
assertThat(info.getShardFollowTaskInfos().get(0).getShardId(), equalTo(1));
|
||||
assertThat(info.getShardFollowTaskInfos().get(0).getLeaderGlobalCheckpoint(), equalTo(8L));
|
||||
assertThat(info.getShardFollowTaskInfos().get(0).getFollowerGlobalCheckpoint(), equalTo(3L));
|
||||
}
|
||||
|
||||
public void testConditionNotMetNotAFollowerIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.numberOfShards(2)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
Client client = Mockito.mock(Client.class);
|
||||
|
||||
WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client);
|
||||
final boolean[] conditionMetHolder = new boolean[1];
|
||||
final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
|
||||
final Exception[] exceptionHolder = new Exception[1];
|
||||
step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
|
||||
conditionMetHolder[0] = conditionMet;
|
||||
informationContextHolder[0] = informationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exceptionHolder[0] = e;
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(conditionMetHolder[0], is(true));
|
||||
assertThat(informationContextHolder[0], nullValue());
|
||||
assertThat(exceptionHolder[0], nullValue());
|
||||
Mockito.verifyZeroInteractions(client);
|
||||
}
|
||||
|
||||
private static ShardFollowNodeTaskStatus createShardFollowTaskStatus(int shardId, long leaderGCP, long followerGCP) {
|
||||
return new ShardFollowNodeTaskStatus(
|
||||
"remote",
|
||||
"leader-index",
|
||||
"follower-index",
|
||||
shardId,
|
||||
leaderGCP,
|
||||
-1,
|
||||
followerGCP,
|
||||
-1,
|
||||
-1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
Collections.emptyNavigableMap(),
|
||||
0,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private void mockFollowStatsCall(Client client, String expectedIndexName, List<FollowStatsAction.StatsResponse> statsResponses) {
|
||||
Mockito.doAnswer(invocationOnMock -> {
|
||||
FollowStatsAction.StatsRequest request = (FollowStatsAction.StatsRequest) invocationOnMock.getArguments()[1];
|
||||
assertThat(request.indices().length, equalTo(1));
|
||||
assertThat(request.indices()[0], equalTo(expectedIndexName));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<FollowStatsAction.StatsResponses> listener =
|
||||
(ActionListener<FollowStatsAction.StatsResponses>) invocationOnMock.getArguments()[2];
|
||||
listener.onResponse(new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), statsResponses));
|
||||
return null;
|
||||
}).when(client).execute(Mockito.eq(FollowStatsAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class WaitForIndexingCompleteStepTests extends AbstractStepTestCase<WaitForIndexingCompleteStep> {
|
||||
|
||||
@Override
|
||||
protected WaitForIndexingCompleteStep createRandomInstance() {
|
||||
StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
return new WaitForIndexingCompleteStep(stepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForIndexingCompleteStep mutateInstance(WaitForIndexingCompleteStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
if (randomBoolean()) {
|
||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
} else {
|
||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
return new WaitForIndexingCompleteStep(key, nextKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForIndexingCompleteStep copyInstance(WaitForIndexingCompleteStep instance) {
|
||||
return new WaitForIndexingCompleteStep(instance.getKey(), instance.getNextStepKey());
|
||||
}
|
||||
|
||||
public void testConditionMet() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.build();
|
||||
|
||||
WaitForIndexingCompleteStep step = createRandomInstance();
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(true));
|
||||
assertThat(result.getInfomationContext(), nullValue());
|
||||
}
|
||||
|
||||
public void testConditionMetNotAFollowerIndex() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.build();
|
||||
|
||||
WaitForIndexingCompleteStep step = createRandomInstance();
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(true));
|
||||
assertThat(result.getInfomationContext(), nullValue());
|
||||
}
|
||||
|
||||
public void testConditionNotMet() {
|
||||
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||
if (randomBoolean()) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "false");
|
||||
}
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(indexSettings)
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.build();
|
||||
|
||||
WaitForIndexingCompleteStep step = createRandomInstance();
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
assertThat(result.getInfomationContext(), notNullValue());
|
||||
WaitForIndexingCompleteStep.IndexingNotCompleteInfo info =
|
||||
(WaitForIndexingCompleteStep.IndexingNotCompleteInfo) result.getInfomationContext();
|
||||
assertThat(info.getMessage(), equalTo("waiting for the [index.lifecycle.indexing_complete] setting to be set to " +
|
||||
"true on the leader index, it is currently [false]"));
|
||||
}
|
||||
|
||||
public void testIndexDeleted() {
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().build())
|
||||
.build();
|
||||
|
||||
WaitForIndexingCompleteStep step = createRandomInstance();
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(new Index("this-index-doesnt-exist", "uuid"), clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
assertThat(result.getInfomationContext(), nullValue());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
public class WaitForYellowStepTests extends AbstractStepTestCase<WaitForYellowStep> {
|
||||
|
||||
@Override
|
||||
protected WaitForYellowStep createRandomInstance() {
|
||||
StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
return new WaitForYellowStep(stepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForYellowStep mutateInstance(WaitForYellowStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
if (randomBoolean()) {
|
||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
} else {
|
||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
return new WaitForYellowStep(key, nextKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForYellowStep copyInstance(WaitForYellowStep instance) {
|
||||
return new WaitForYellowStep(instance.getKey(), instance.getNextStepKey());
|
||||
}
|
||||
|
||||
public void testConditionMet() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ShardRouting shardRouting =
|
||||
TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.STARTED);
|
||||
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
|
||||
.addShard(shardRouting).build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
|
||||
WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey());
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(true));
|
||||
assertThat(result.getInfomationContext(), nullValue());
|
||||
}
|
||||
|
||||
public void testConditionNotMet() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ShardRouting shardRouting =
|
||||
TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING);
|
||||
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
|
||||
.addShard(shardRouting).build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
|
||||
WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey());
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
WaitForYellowStep.Info info = (WaitForYellowStep.Info) result.getInfomationContext();
|
||||
assertThat(info, notNullValue());
|
||||
assertThat(info.getMessage(), equalTo("index is red; not all primary shards are active"));
|
||||
}
|
||||
|
||||
public void testConditionNotMetNoIndexRoutingTable() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().put(indexMetadata, true).build())
|
||||
.routingTable(RoutingTable.builder().build())
|
||||
.build();
|
||||
|
||||
WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey());
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
WaitForYellowStep.Info info = (WaitForYellowStep.Info) result.getInfomationContext();
|
||||
assertThat(info, notNullValue());
|
||||
assertThat(info.getMessage(), equalTo("index is red; no IndexRoutingTable"));
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction.Request;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -68,7 +69,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new)
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -85,7 +87,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
import org.elasticsearch.gradle.test.RestIntegTestTask
|
||||
|
||||
apply plugin: 'elasticsearch.standalone-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(':x-pack:plugin:ccr:qa')
|
||||
}
|
||||
|
||||
task leaderClusterTest(type: RestIntegTestTask) {
|
||||
mustRunAfter(precommit)
|
||||
}
|
||||
|
||||
leaderClusterTestCluster {
|
||||
numNodes = 1
|
||||
clusterName = 'leader-cluster'
|
||||
setting 'xpack.ilm.enabled', 'true'
|
||||
setting 'xpack.ccr.enabled', 'true'
|
||||
setting 'xpack.security.enabled', 'false'
|
||||
setting 'xpack.watcher.enabled', 'false'
|
||||
setting 'xpack.monitoring.enabled', 'false'
|
||||
setting 'xpack.ml.enabled', 'false'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'indices.lifecycle.poll_interval', '1000ms'
|
||||
}
|
||||
|
||||
leaderClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'leader'
|
||||
}
|
||||
|
||||
task followClusterTest(type: RestIntegTestTask) {}
|
||||
|
||||
followClusterTestCluster {
|
||||
dependsOn leaderClusterTestRunner
|
||||
numNodes = 1
|
||||
clusterName = 'follow-cluster'
|
||||
setting 'xpack.ilm.enabled', 'true'
|
||||
setting 'xpack.ccr.enabled', 'true'
|
||||
setting 'xpack.security.enabled', 'false'
|
||||
setting 'xpack.watcher.enabled', 'false'
|
||||
setting 'xpack.monitoring.enabled', 'false'
|
||||
setting 'xpack.ml.enabled', 'false'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'indices.lifecycle.poll_interval', '1000ms'
|
||||
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
|
||||
}
|
||||
|
||||
followClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'follow'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
finalizedBy 'leaderClusterTestCluster#stop'
|
||||
}
|
||||
|
||||
check.dependsOn followClusterTest
|
||||
unitTest.enabled = false // no unit tests for this module, only the rest integration test
|
|
@ -0,0 +1,285 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.ccr.ESCCRRestTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
|
||||
|
||||
public void testBasicCCRAndILMIntegration() throws Exception {
|
||||
String indexName = "logs-1";
|
||||
|
||||
String policyName = "basic-test";
|
||||
if ("leader".equals(targetCluster)) {
|
||||
putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24));
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.lifecycle.name", policyName)
|
||||
.put("index.lifecycle.rollover_alias", "logs")
|
||||
.build();
|
||||
createIndex(indexName, indexSettings, "", "\"logs\": { }");
|
||||
ensureGreen(indexName);
|
||||
} else if ("follow".equals(targetCluster)) {
|
||||
// Policy with the same name must exist in follower cluster too:
|
||||
putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24));
|
||||
followIndex(indexName, indexName);
|
||||
// Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster:
|
||||
client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs"));
|
||||
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
index(leaderClient, indexName, "1");
|
||||
assertDocumentExists(leaderClient, indexName, "1");
|
||||
|
||||
assertBusy(() -> {
|
||||
assertDocumentExists(client(), indexName, "1");
|
||||
// Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset:
|
||||
assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), equalTo("true"));
|
||||
|
||||
assertILMPolicy(leaderClient, indexName, policyName, "hot");
|
||||
assertILMPolicy(client(), indexName, policyName, "hot");
|
||||
});
|
||||
|
||||
updateIndexSettings(leaderClient, indexName, Settings.builder()
|
||||
.put("index.lifecycle.indexing_complete", true)
|
||||
.build()
|
||||
);
|
||||
|
||||
assertBusy(() -> {
|
||||
// Ensure that 'index.lifecycle.indexing_complete' is replicated:
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
|
||||
assertILMPolicy(leaderClient, indexName, policyName, "warm");
|
||||
assertILMPolicy(client(), indexName, policyName, "warm");
|
||||
|
||||
// ILM should have placed both indices in the warm phase and there these indices are read-only:
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true"));
|
||||
assertThat(getIndexSetting(client(), indexName, "index.blocks.write"), equalTo("true"));
|
||||
// ILM should have unfollowed the follower index, so the following_index setting should have been removed:
|
||||
// (this controls whether the follow engine is used)
|
||||
assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue());
|
||||
});
|
||||
}
|
||||
} else {
|
||||
fail("unexpected target cluster [" + targetCluster + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public void testCcrAndIlmWithRollover() throws Exception {
|
||||
String alias = "metrics";
|
||||
String indexName = "metrics-000001";
|
||||
String nextIndexName = "metrics-000002";
|
||||
String policyName = "rollover-test";
|
||||
|
||||
if ("leader".equals(targetCluster)) {
|
||||
// Create a policy on the leader
|
||||
putILMPolicy(policyName, null, 1, null);
|
||||
Request templateRequest = new Request("PUT", "_template/my_template");
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.lifecycle.name", policyName)
|
||||
.put("index.lifecycle.rollover_alias", alias)
|
||||
.build();
|
||||
templateRequest.setJsonEntity("{\"index_patterns\": [\"metrics-*\"], \"settings\": " + Strings.toString(indexSettings) + "}");
|
||||
assertOK(client().performRequest(templateRequest));
|
||||
} else if ("follow".equals(targetCluster)) {
|
||||
// Policy with the same name must exist in follower cluster too:
|
||||
putILMPolicy(policyName, null, 1, null);
|
||||
|
||||
// Set up an auto-follow pattern
|
||||
Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/my_auto_follow_pattern");
|
||||
createAutoFollowRequest.setJsonEntity("{\"leader_index_patterns\": [\"metrics-*\"], " +
|
||||
"\"remote_cluster\": \"leader_cluster\", \"read_poll_timeout\": \"1000ms\"}");
|
||||
assertOK(client().performRequest(createAutoFollowRequest));
|
||||
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
// Create an index on the leader using the template set up above
|
||||
Request createIndexRequest = new Request("PUT", "/" + indexName);
|
||||
createIndexRequest.setJsonEntity("{" +
|
||||
"\"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}, " +
|
||||
"\"aliases\": {\"" + alias + "\": {\"is_write_index\": true}} }");
|
||||
assertOK(leaderClient.performRequest(createIndexRequest));
|
||||
// Check that the new index is creeg
|
||||
Request checkIndexRequest = new Request("GET", "/_cluster/health/" + indexName);
|
||||
checkIndexRequest.addParameter("wait_for_status", "green");
|
||||
checkIndexRequest.addParameter("timeout", "70s");
|
||||
checkIndexRequest.addParameter("level", "shards");
|
||||
assertOK(leaderClient.performRequest(checkIndexRequest));
|
||||
|
||||
// Check that it got replicated to the follower
|
||||
assertBusy(() -> assertTrue(indexExists(indexName)));
|
||||
|
||||
// Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster:
|
||||
client().performRequest(new Request("PUT", "/" + indexName + "/_alias/" + alias));
|
||||
|
||||
index(leaderClient, indexName, "1");
|
||||
assertDocumentExists(leaderClient, indexName, "1");
|
||||
|
||||
assertBusy(() -> {
|
||||
assertDocumentExists(client(), indexName, "1");
|
||||
// Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset:
|
||||
assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), equalTo("true"));
|
||||
});
|
||||
|
||||
// Wait for the index to roll over on the leader
|
||||
assertBusy(() -> {
|
||||
assertOK(leaderClient.performRequest(new Request("HEAD", "/" + nextIndexName)));
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the next index should have been created on the leader
|
||||
assertOK(leaderClient.performRequest(new Request("HEAD", "/" + nextIndexName)));
|
||||
// And the old index should have a write block and indexing complete set
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true"));
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the setting to get replicated to the follower
|
||||
assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
// ILM should have unfollowed the follower index, so the following_index setting should have been removed:
|
||||
// (this controls whether the follow engine is used)
|
||||
assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue());
|
||||
// The next index should have been created on the follower as well
|
||||
indexExists(nextIndexName);
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
// And the previously-follower index should be in the warm phase
|
||||
assertILMPolicy(client(), indexName, policyName, "warm");
|
||||
});
|
||||
|
||||
// Clean up
|
||||
leaderClient.performRequest(new Request("DELETE", "/_template/my_template"));
|
||||
}
|
||||
} else {
|
||||
fail("unexpected target cluster [" + targetCluster + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private static void putILMPolicy(String name, String maxSize, Integer maxDocs, TimeValue maxAge) throws IOException {
|
||||
final Request request = new Request("PUT", "_ilm/policy/" + name);
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("policy");
|
||||
{
|
||||
builder.startObject("phases");
|
||||
{
|
||||
builder.startObject("hot");
|
||||
{
|
||||
builder.startObject("actions");
|
||||
{
|
||||
builder.startObject("rollover");
|
||||
if (maxSize != null) {
|
||||
builder.field("max_size", maxSize);
|
||||
}
|
||||
if (maxAge != null) {
|
||||
builder.field("max_age", maxAge);
|
||||
}
|
||||
if (maxDocs != null) {
|
||||
builder.field("max_docs", maxDocs);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
{
|
||||
builder.startObject("unfollow");
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
builder.startObject("warm");
|
||||
{
|
||||
builder.startObject("actions");
|
||||
{
|
||||
builder.startObject("readonly");
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
builder.startObject("delete");
|
||||
{
|
||||
builder.field("min_age", "7d");
|
||||
builder.startObject("actions");
|
||||
{
|
||||
builder.startObject("delete");
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
request.setJsonEntity(Strings.toString(builder));
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void assertILMPolicy(RestClient client, String index, String policy, String expectedPhase) throws IOException {
|
||||
final Request request = new Request("GET", "/" + index + "/_ilm/explain");
|
||||
Map<String, Object> response = toMap(client.performRequest(request));
|
||||
LOGGER.info("response={}", response);
|
||||
Map<?, ?> explanation = (Map<?, ?>) ((Map<?, ?>) response.get("indices")).get(index);
|
||||
assertThat(explanation.get("managed"), is(true));
|
||||
assertThat(explanation.get("policy"), equalTo(policy));
|
||||
assertThat(explanation.get("phase"), equalTo(expectedPhase));
|
||||
}
|
||||
|
||||
private static void updateIndexSettings(RestClient client, String index, Settings settings) throws IOException {
|
||||
final Request request = new Request("PUT", "/" + index + "/_settings");
|
||||
request.setJsonEntity(Strings.toString(settings));
|
||||
assertOK(client.performRequest(request));
|
||||
}
|
||||
|
||||
private static Object getIndexSetting(RestClient client, String index, String setting) throws IOException {
|
||||
Request request = new Request("GET", "/" + index + "/_settings");
|
||||
request.addParameter("flat_settings", "true");
|
||||
Map<String, Object> response = toMap(client.performRequest(request));
|
||||
Map<?, ?> settings = (Map<?, ?>) ((Map<?, ?>) response.get(index)).get("settings");
|
||||
return settings.get(setting);
|
||||
}
|
||||
|
||||
private static void assertDocumentExists(RestClient client, String index, String id) throws IOException {
|
||||
Request request = new Request("HEAD", "/" + index + "/_doc/" + id);
|
||||
Response response = client.performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
}
|
||||
|
||||
}
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
|
@ -161,7 +162,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -85,7 +86,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new)
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -102,7 +104,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue