reroute API: log messages from commands (#25955)

Gives allocation commands from the cluster reroute API
the ability to provide messages to be logged once the 
cluster state change has been committed. 

The purpose of this change is to create a record in the 
logs when allocation commands which could potentially
be destructive are applied. The allocate_empty_primary
and allocate_stale_primary commands are the only ones
that currently provide log messages.

Closes #22821
This commit is contained in:
Andy Bristol 2017-08-21 17:09:40 -07:00 committed by GitHub
parent 0120448f76
commit bdefcbdcd6
6 changed files with 142 additions and 8 deletions

View File

@ -68,8 +68,18 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
@Override @Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) { protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
},
listener::onFailure
);
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, listener)); allocationService, request, logWrapper));
} }
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> { static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContent.Params;
@ -28,6 +29,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* Class used to encapsulate a number of {@link RerouteExplanation} * Class used to encapsulate a number of {@link RerouteExplanation}
@ -49,6 +52,18 @@ public class RoutingExplanations implements ToXContentFragment {
return this.explanations; return this.explanations;
} }
/**
* Provides feedback from commands with a YES decision that should be displayed to the user after the command has been applied
*/
public List<String> getYesDecisionMessages() {
return explanations().stream()
.filter(explanation -> explanation.decisions().type().equals(Decision.Type.YES))
.map(explanation -> explanation.command().getMessage())
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
/** /**
* Read in a RoutingExplanations object * Read in a RoutingExplanations object
*/ */

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
/** /**
* Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss. * Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss.
@ -72,6 +73,11 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
return NAME; return NAME;
} }
@Override
public Optional<String> getMessage() {
return Optional.of("allocated an empty primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command");
}
public static AllocateEmptyPrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException { public static AllocateEmptyPrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
return new Builder().parse(parser).build(); return new Builder().parse(parser).build();
} }
@ -115,19 +121,22 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
} }
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EMPTY_STORE && acceptDataLoss == false) { if (shardRouting.recoverySource().getType() != RecoverySource.Type.EMPTY_STORE && acceptDataLoss == false) {
return explainOrThrowRejectedCommand(explain, allocation, String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm " +
"allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true"); "by setting the accept_data_loss parameter to true";
return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning);
} }
UnassignedInfo unassignedInfoToUpdate = null; UnassignedInfo unassignedInfoToUpdate = null;
if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY) { if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY) {
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, String unassignedInfoMessage = "force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() +
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(), ", " + shardRouting.unassignedInfo().getMessage();
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage,
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false,
shardRouting.unassignedInfo().getLastAllocationStatus()); shardRouting.unassignedInfo().getLastAllocationStatus());
} }
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, StoreRecoverySource.EMPTY_STORE_INSTANCE); initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
StoreRecoverySource.EMPTY_STORE_INSTANCE);
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders")); return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
/** /**
* Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss. * Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss.
@ -70,6 +71,11 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
return NAME; return NAME;
} }
@Override
public Optional<String> getMessage() {
return Optional.of("allocated a stale primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command");
}
public static AllocateStalePrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException { public static AllocateStalePrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
return new Builder().parse(parser).build(); return new Builder().parse(parser).build();
} }
@ -113,8 +119,9 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
} }
if (acceptDataLoss == false) { if (acceptDataLoss == false) {
return explainOrThrowRejectedCommand(explain, allocation, String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please " +
"allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true"); "confirm by setting the accept_data_loss parameter to true";
return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning);
} }
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE) { if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
/** /**
* A command to move shards in some way. * A command to move shards in some way.
@ -61,4 +62,12 @@ public interface AllocationCommand extends NamedWriteable, ToXContent {
default String getWriteableName() { default String getWriteableName() {
return name(); return name();
} }
/**
* Returns any feedback the command wants to provide for logging. This message should be appropriate to expose to the user after the
* command has been applied
*/
default Optional<String> getMessage() {
return Optional.empty();
}
} }

View File

@ -19,20 +19,24 @@
package org.elasticsearch.cluster.allocation; package org.elasticsearch.cluster.allocation;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -50,6 +54,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
@ -63,6 +68,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
@ -304,6 +310,84 @@ public class ClusterRerouteIT extends ESIntegTestCase {
assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES)); assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES));
} }
public void testMessageLogging() throws Exception{
final Settings settings = Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE.name())
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name())
.build();
final String nodeName1 = internalCluster().startNode(settings);
assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1")
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
final String nodeName2 = internalCluster().startNode(settings);
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
final String indexName = "test_index";
client().admin().indices().prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1))
.execute().actionGet();
Logger actionLogger = Loggers.getLogger(TransportClusterRerouteAction.class);
MockLogAppender dryRunMockLog = new MockLogAppender();
dryRunMockLog.start();
dryRunMockLog.addExpectation(
new MockLogAppender.UnseenEventExpectation("no completed message logged on dry run",
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*")
);
Loggers.addAppender(actionLogger, dryRunMockLog);
AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
ClusterRerouteResponse dryRunResponse = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.setDryRun(true)
.add(dryRunAllocation)
.execute().actionGet();
// during a dry run, messages exist but are not logged or exposed
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages(), hasSize(1));
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
dryRunMockLog.assertAllExpectationsMatched();
dryRunMockLog.stop();
Loggers.removeAppender(actionLogger, dryRunMockLog);
MockLogAppender allocateMockLog = new MockLogAppender();
allocateMockLog.start();
allocateMockLog.addExpectation(
new MockLogAppender.SeenEventExpectation("message for first allocate empty primary",
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName1 + "*")
);
allocateMockLog.addExpectation(
new MockLogAppender.UnseenEventExpectation("no message for second allocate empty primary",
TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName2 + "*")
);
Loggers.addAppender(actionLogger, allocateMockLog);
AllocationCommand yesDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
AllocationCommand noDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand("noexist", 1, nodeName2, true);
ClusterRerouteResponse response = client().admin().cluster().prepareReroute()
.setExplain(true) // so we get a NO decision back rather than an exception
.add(yesDecisionAllocation)
.add(noDecisionAllocation)
.execute().actionGet();
assertThat(response.getExplanations().getYesDecisionMessages(), hasSize(1));
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString(nodeName1));
allocateMockLog.assertAllExpectationsMatched();
allocateMockLog.stop();
Loggers.removeAppender(actionLogger, allocateMockLog);
}
public void testClusterRerouteWithBlocks() throws Exception { public void testClusterRerouteWithBlocks() throws Exception {
List<String> nodesIds = internalCluster().startNodes(2); List<String> nodesIds = internalCluster().startNodes(2);