Merge remote-tracking branch 'dakrone/allocation-explain-show-delay'
This commit is contained in:
commit
82c2b1e48e
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
@ -47,6 +48,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
||||||
private final Map<DiscoveryNode, Decision> nodeToDecision;
|
private final Map<DiscoveryNode, Decision> nodeToDecision;
|
||||||
private final Map<DiscoveryNode, Float> nodeWeights;
|
private final Map<DiscoveryNode, Float> nodeWeights;
|
||||||
private final UnassignedInfo unassignedInfo;
|
private final UnassignedInfo unassignedInfo;
|
||||||
|
private final long remainingDelayNanos;
|
||||||
|
|
||||||
public ClusterAllocationExplanation(StreamInput in) throws IOException {
|
public ClusterAllocationExplanation(StreamInput in) throws IOException {
|
||||||
this.shard = ShardId.readShardId(in);
|
this.shard = ShardId.readShardId(in);
|
||||||
|
@ -73,17 +75,19 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
||||||
ntw.put(dn, weight);
|
ntw.put(dn, weight);
|
||||||
}
|
}
|
||||||
this.nodeWeights = ntw;
|
this.nodeWeights = ntw;
|
||||||
|
remainingDelayNanos = in.readVLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
|
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
|
||||||
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
|
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
|
||||||
Map<DiscoveryNode, Float> nodeWeights) {
|
Map<DiscoveryNode, Float> nodeWeights, long remainingDelayNanos) {
|
||||||
this.shard = shard;
|
this.shard = shard;
|
||||||
this.primary = primary;
|
this.primary = primary;
|
||||||
this.assignedNodeId = assignedNodeId;
|
this.assignedNodeId = assignedNodeId;
|
||||||
this.unassignedInfo = unassignedInfo;
|
this.unassignedInfo = unassignedInfo;
|
||||||
this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
|
this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
|
||||||
this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights;
|
this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights;
|
||||||
|
this.remainingDelayNanos = remainingDelayNanos;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardId getShard() {
|
public ShardId getShard() {
|
||||||
|
@ -124,6 +128,11 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
||||||
return this.nodeWeights;
|
return this.nodeWeights;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Return the remaining allocation delay for this shard in nanoseconds */
|
||||||
|
public long getRemainingDelayNanos() {
|
||||||
|
return this.remainingDelayNanos;
|
||||||
|
}
|
||||||
|
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject(); {
|
builder.startObject(); {
|
||||||
builder.startObject("shard"); {
|
builder.startObject("shard"); {
|
||||||
|
@ -141,6 +150,11 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
||||||
// If we have unassigned info, show that
|
// If we have unassigned info, show that
|
||||||
if (unassignedInfo != null) {
|
if (unassignedInfo != null) {
|
||||||
unassignedInfo.toXContent(builder, params);
|
unassignedInfo.toXContent(builder, params);
|
||||||
|
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
|
||||||
|
builder.field("allocation_delay", TimeValue.timeValueNanos(delay));
|
||||||
|
builder.field("allocation_delay_ms", TimeValue.timeValueNanos(delay).millis());
|
||||||
|
builder.field("remaining_delay", TimeValue.timeValueNanos(remainingDelayNanos));
|
||||||
|
builder.field("remaining_delay_ms", TimeValue.timeValueNanos(remainingDelayNanos).millis());
|
||||||
}
|
}
|
||||||
builder.startObject("nodes");
|
builder.startObject("nodes");
|
||||||
for (Map.Entry<DiscoveryNode, Float> entry : nodeWeights.entrySet()) {
|
for (Map.Entry<DiscoveryNode, Float> entry : nodeWeights.entrySet()) {
|
||||||
|
@ -194,5 +208,6 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
||||||
entry.getKey().writeTo(out);
|
entry.getKey().writeTo(out);
|
||||||
out.writeFloat(entry.getValue());
|
out.writeFloat(entry.getValue());
|
||||||
}
|
}
|
||||||
|
out.writeVLong(remainingDelayNanos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,8 +139,14 @@ public class TransportClusterAllocationExplainAction
|
||||||
nodeToDecision.put(discoNode, d);
|
nodeToDecision.put(discoNode, d);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
long remainingDelayNanos = 0;
|
||||||
|
if (ui != null) {
|
||||||
|
final MetaData metadata = allocation.metaData();
|
||||||
|
final Settings indexSettings = metadata.index(shard.index()).getSettings();
|
||||||
|
remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
|
||||||
|
}
|
||||||
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
|
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
|
||||||
shardAllocator.weighShard(allocation, shard));
|
shardAllocator.weighShard(allocation, shard), remainingDelayNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -231,20 +231,28 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
||||||
return lastComputedLeftDelayNanos;
|
return lastComputedLeftDelayNanos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the delay left based on current time (in nanoseconds) and index/node settings.
|
||||||
|
*
|
||||||
|
* @return calculated delay in nanoseconds
|
||||||
|
*/
|
||||||
|
public long getRemainingDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
|
||||||
|
final long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
|
||||||
|
if (delayTimeoutNanos == 0L) {
|
||||||
|
return 0L;
|
||||||
|
} else {
|
||||||
|
assert nanoTimeNow >= unassignedTimeNanos;
|
||||||
|
return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates delay left based on current time (in nanoseconds) and index/node settings.
|
* Updates delay left based on current time (in nanoseconds) and index/node settings.
|
||||||
*
|
*
|
||||||
* @return updated delay in nanoseconds
|
* @return updated delay in nanoseconds
|
||||||
*/
|
*/
|
||||||
public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) {
|
public long updateDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
|
||||||
long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
|
final long newComputedLeftDelayNanos = getRemainingDelay(nanoTimeNow, settings, indexSettings);
|
||||||
final long newComputedLeftDelayNanos;
|
|
||||||
if (delayTimeoutNanos == 0L) {
|
|
||||||
newComputedLeftDelayNanos = 0L;
|
|
||||||
} else {
|
|
||||||
assert nanoTimeNow >= unassignedTimeNanos;
|
|
||||||
newComputedLeftDelayNanos = Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
|
|
||||||
}
|
|
||||||
lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
|
lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
|
||||||
return newComputedLeftDelayNanos;
|
return newComputedLeftDelayNanos;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.action.admin.cluster.allocation;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for the cluster allocation explanation
|
||||||
|
*/
|
||||||
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||||
|
public final class ClusterAllocationExplainIT extends ESIntegTestCase {
|
||||||
|
public void testDelayShards() throws Exception {
|
||||||
|
logger.info("--> starting 3 nodes");
|
||||||
|
List<String> nodes = internalCluster().startNodesAsync(3).get();
|
||||||
|
|
||||||
|
// Wait for all 3 nodes to be up
|
||||||
|
logger.info("--> waiting for 3 nodes to be up");
|
||||||
|
assertBusy(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
|
||||||
|
assertThat(resp.getNodes().length, equalTo(3));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("--> creating 'test' index");
|
||||||
|
prepareCreate("test").setSettings(Settings.settingsBuilder()
|
||||||
|
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m")
|
||||||
|
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5)
|
||||||
|
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).get();
|
||||||
|
ensureGreen("test");
|
||||||
|
|
||||||
|
logger.info("--> stopping a random node");
|
||||||
|
assertTrue(internalCluster().stopRandomDataNode());
|
||||||
|
|
||||||
|
ensureYellow("test");
|
||||||
|
|
||||||
|
ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain().useAnyUnassignedShard().get();
|
||||||
|
ClusterAllocationExplanation cae = resp.getExplanation();
|
||||||
|
assertThat(cae.getShard().getIndexName(), equalTo("test"));
|
||||||
|
assertFalse(cae.isPrimary());
|
||||||
|
assertFalse(cae.isAssigned());
|
||||||
|
assertThat("expecting a remaining delay, got: " + cae.getRemainingDelayNanos(), cae.getRemainingDelayNanos(), greaterThan(0L));
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,9 +64,10 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
|
||||||
nodeToDecisions.put(dn, d);
|
nodeToDecisions.put(dn, d);
|
||||||
nodeToWeight.put(dn, randomFloat());
|
nodeToWeight.put(dn, randomFloat());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long remainingDelay = randomIntBetween(0, 500);
|
||||||
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null,
|
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null,
|
||||||
nodeToDecisions, nodeToWeight);
|
nodeToDecisions, nodeToWeight, remainingDelay);
|
||||||
BytesStreamOutput out = new BytesStreamOutput();
|
BytesStreamOutput out = new BytesStreamOutput();
|
||||||
cae.writeTo(out);
|
cae.writeTo(out);
|
||||||
StreamInput in = StreamInput.wrap(out.bytes());
|
StreamInput in = StreamInput.wrap(out.bytes());
|
||||||
|
@ -80,5 +81,6 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
|
||||||
assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue());
|
assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue());
|
||||||
}
|
}
|
||||||
assertEquals(nodeToWeight, cae2.getNodeWeights());
|
assertEquals(nodeToWeight, cae2.getNodeWeights());
|
||||||
|
assertEquals(remainingDelay, cae2.getRemainingDelayNanos());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue