From bdefcbdcd6d38a6af15019cd932424c507f61402 Mon Sep 17 00:00:00 2001 From: Andy Bristol Date: Mon, 21 Aug 2017 17:09:40 -0700 Subject: [PATCH] reroute API: log messages from commands (#25955) Gives allocation commands from the cluster reroute API the ability to provide messages to be logged once the cluster state change has been committed. The purpose of this change is to create a record in the logs when allocation commands which could potentially be destructive are applied. The allocate_empty_primary and allocate_stale_primary commands are the only ones that currently provide log messages. Closes #22821 --- .../TransportClusterRerouteAction.java | 12 ++- .../allocation/RoutingExplanations.java | 15 ++++ ...AllocateEmptyPrimaryAllocationCommand.java | 19 +++-- ...AllocateStalePrimaryAllocationCommand.java | 11 ++- .../allocation/command/AllocationCommand.java | 9 ++ .../cluster/allocation/ClusterRerouteIT.java | 84 +++++++++++++++++++ 6 files changed, 142 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 6fe497dd16f..6e4d628ea5f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -68,8 +68,18 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction listener) { + ActionListener logWrapper = ActionListener.wrap( + response -> { + if (request.dryRun() == false) { + response.getExplanations().getYesDecisionMessages().forEach(logger::info); + } + listener.onResponse(response); + }, + listener::onFailure + ); + clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, - allocationService, request, listener)); + allocationService, request, logWrapper)); } static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingExplanations.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingExplanations.java index bd9ef7c95c7..fe97b524298 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingExplanations.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingExplanations.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent.Params; @@ -28,6 +29,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; /** * Class used to encapsulate a number of {@link RerouteExplanation} @@ -49,6 +52,18 @@ public class RoutingExplanations implements ToXContentFragment { return this.explanations; } + /** + * Provides feedback from commands with a YES decision that should be displayed to the user after the command has been applied + */ + public List getYesDecisionMessages() { + return explanations().stream() + .filter(explanation -> explanation.decisions().type().equals(Decision.Type.YES)) + .map(explanation -> explanation.command().getMessage()) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + /** * Read in a RoutingExplanations object */ diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 157acc0e537..66281b73458 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.Optional; /** * Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss. @@ -72,6 +73,11 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation return NAME; } + @Override + public Optional getMessage() { + return Optional.of("allocated an empty primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command"); + } + public static AllocateEmptyPrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException { return new Builder().parse(parser).build(); } @@ -115,19 +121,22 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation } if (shardRouting.recoverySource().getType() != RecoverySource.Type.EMPTY_STORE && acceptDataLoss == false) { - return explainOrThrowRejectedCommand(explain, allocation, - "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true"); + String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm " + + "by setting the accept_data_loss parameter to true"; + return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning); } UnassignedInfo unassignedInfoToUpdate = null; if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY) { - unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, - "force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(), + String unassignedInfoMessage = "force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + + ", " + shardRouting.unassignedInfo().getMessage(); + unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, shardRouting.unassignedInfo().getLastAllocationStatus()); } - initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, StoreRecoverySource.EMPTY_STORE_INSTANCE); + initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, + StoreRecoverySource.EMPTY_STORE_INSTANCE); return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders")); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java index c643fb5c948..11c4420200e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.Optional; /** * Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss. @@ -70,6 +71,11 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation return NAME; } + @Override + public Optional getMessage() { + return Optional.of("allocated a stale primary for [" + index + "][" + shardId + "] on node [" + node + "] from user command"); + } + public static AllocateStalePrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException { return new Builder().parse(parser).build(); } @@ -113,8 +119,9 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation } if (acceptDataLoss == false) { - return explainOrThrowRejectedCommand(explain, allocation, - "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please confirm by setting the accept_data_loss parameter to true"); + String dataLossWarning = "allocating an empty primary for [" + index + "][" + shardId + "] can result in data loss. Please " + + "confirm by setting the accept_data_loss parameter to true"; + return explainOrThrowRejectedCommand(explain, allocation, dataLossWarning); } if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java index 92c1ffa9921..2a895672c90 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Optional; /** * A command to move shards in some way. @@ -61,4 +62,12 @@ public interface AllocationCommand extends NamedWriteable, ToXContent { default String getWriteableName() { return name(); } + + /** + * Returns any feedback the command wants to provide for logging. This message should be appropriate to expose to the user after the + * command has been applied + */ + default Optional getMessage() { + return Optional.empty(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index b6b6b3024b4..0522f3f15f8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -19,20 +19,24 @@ package org.elasticsearch.cluster.allocation; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -50,6 +54,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.MockLogAppender; import java.nio.file.Path; import java.util.Arrays; @@ -63,6 +68,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -304,6 +310,84 @@ public class ClusterRerouteIT extends ESIntegTestCase { assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES)); } + public void testMessageLogging() throws Exception{ + final Settings settings = Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE.name()) + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name()) + .build(); + + final String nodeName1 = internalCluster().startNode(settings); + assertThat(cluster().size(), equalTo(1)); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1") + .execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + final String nodeName2 = internalCluster().startNode(settings); + assertThat(cluster().size(), equalTo(2)); + healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + final String indexName = "test_index"; + client().admin().indices().prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)) + .execute().actionGet(); + + Logger actionLogger = Loggers.getLogger(TransportClusterRerouteAction.class); + + MockLogAppender dryRunMockLog = new MockLogAppender(); + dryRunMockLog.start(); + dryRunMockLog.addExpectation( + new MockLogAppender.UnseenEventExpectation("no completed message logged on dry run", + TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*") + ); + Loggers.addAppender(actionLogger, dryRunMockLog); + + AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true); + ClusterRerouteResponse dryRunResponse = client().admin().cluster().prepareReroute() + .setExplain(randomBoolean()) + .setDryRun(true) + .add(dryRunAllocation) + .execute().actionGet(); + + // during a dry run, messages exist but are not logged or exposed + assertThat(dryRunResponse.getExplanations().getYesDecisionMessages(), hasSize(1)); + assertThat(dryRunResponse.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary")); + + dryRunMockLog.assertAllExpectationsMatched(); + dryRunMockLog.stop(); + Loggers.removeAppender(actionLogger, dryRunMockLog); + + MockLogAppender allocateMockLog = new MockLogAppender(); + allocateMockLog.start(); + allocateMockLog.addExpectation( + new MockLogAppender.SeenEventExpectation("message for first allocate empty primary", + TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName1 + "*") + ); + allocateMockLog.addExpectation( + new MockLogAppender.UnseenEventExpectation("no message for second allocate empty primary", + TransportClusterRerouteAction.class.getName(), Level.INFO, "allocated an empty primary*" + nodeName2 + "*") + ); + Loggers.addAppender(actionLogger, allocateMockLog); + + AllocationCommand yesDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true); + AllocationCommand noDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand("noexist", 1, nodeName2, true); + ClusterRerouteResponse response = client().admin().cluster().prepareReroute() + .setExplain(true) // so we get a NO decision back rather than an exception + .add(yesDecisionAllocation) + .add(noDecisionAllocation) + .execute().actionGet(); + + assertThat(response.getExplanations().getYesDecisionMessages(), hasSize(1)); + assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary")); + assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString(nodeName1)); + + allocateMockLog.assertAllExpectationsMatched(); + allocateMockLog.stop(); + Loggers.removeAppender(actionLogger, allocateMockLog); + } + public void testClusterRerouteWithBlocks() throws Exception { List nodesIds = internalCluster().startNodes(2);