Make AllocationCommands register in the same way as queries

The cluster reroute API had a copy of NamedWriteableRegistry's behavior
inside it in the form of AllocationCommands#registerFactory and
AllocationCommands#lookupFactorySafe. There isn't a reason to duplicate
that effort. So this replaces all of AllocationCommand#Factory with
query-like registration in NetworkModule. Why NetworkModule? Because
the transport client needs it.
This commit is contained in:
Nik Everett 2016-04-11 15:14:11 -04:00
parent 79fbf91ece
commit fe4d1297df
19 changed files with 404 additions and 358 deletions

View File

@ -23,20 +23,24 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommandRegistry;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
/**
* Request to submit cluster reroute allocation commands
*/
public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteRequest> {
AllocationCommands commands = new AllocationCommands();
boolean dryRun;
boolean explain;
@ -87,11 +91,21 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
return this.explain;
}
/**
* Set the allocation commands to execute.
*/
public ClusterRerouteRequest commands(AllocationCommand... commands) {
this.commands = new AllocationCommands(commands);
return this;
}
/**
* Sets the source for the request.
*/
public ClusterRerouteRequest source(BytesReference source) throws Exception {
public ClusterRerouteRequest source(BytesReference source, AllocationCommandRegistry registry, ParseFieldMatcher parseFieldMatcher)
throws Exception {
try (XContentParser parser = XContentHelper.createParser(source)) {
parser.setParseFieldMatcher(parseFieldMatcher);
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -99,7 +113,7 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("commands".equals(currentFieldName)) {
this.commands = AllocationCommands.fromXContent(parser);
this.commands = AllocationCommands.fromXContent(parser, registry);
} else {
throw new ElasticsearchParseException("failed to parse reroute request, got start array with wrong field name [{}]", currentFieldName);
}

View File

@ -61,10 +61,10 @@ public class ClusterRerouteRequestBuilder extends AcknowledgedRequestBuilder<Clu
}
/**
* Sets the source for the request
* Sets the commands for the request to execute.
*/
public ClusterRerouteRequestBuilder setSource(BytesReference source) throws Exception {
request.source(source);
public ClusterRerouteRequestBuilder setCommands(AllocationCommand... commands) throws Exception {
request.commands(commands);
return this;
}
}

View File

@ -52,15 +52,13 @@ public class RerouteExplanation implements ToXContent {
}
public static RerouteExplanation readFrom(StreamInput in) throws IOException {
String commandName = in.readString();
AllocationCommand command = AllocationCommands.lookupFactorySafe(commandName).readFrom(in);
AllocationCommand command = in.readAllocationCommand();
Decision decisions = Decision.readFrom(in);
return new RerouteExplanation(command, decisions);
}
public static void writeTo(RerouteExplanation explanation, StreamOutput out) throws IOException {
out.writeString(explanation.command.name());
AllocationCommands.lookupFactorySafe(explanation.command.name()).writeTo(explanation.command, out);
out.writeAllocationCommand(explanation.command);
Decision.writeTo(explanation.decisions, out);
}
@ -68,7 +66,7 @@ public class RerouteExplanation implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("command", command.name());
AllocationCommands.lookupFactorySafe(command.name()).toXContent(command, builder, params, "parameters");
builder.field("parameters", command);
// The Decision could be a Multi or Single decision, and they should
// both be encoded the same, so check and wrap in an array if necessary
if (decisions instanceof Decision.Multi) {

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.StreamableReader;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -42,21 +41,24 @@ import java.util.function.Consumer;
/**
* Abstract base class for allocating an unassigned shard to a node
*/
public abstract class AbstractAllocateAllocationCommand implements AllocationCommand, ToXContent {
public abstract class AbstractAllocateAllocationCommand implements AllocationCommand {
private static final String INDEX_KEY = "index";
private static final String SHARD_KEY = "shard";
private static final String NODE_KEY = "node";
private static final String INDEX_FIELD = "index";
private static final String SHARD_FIELD = "shard";
private static final String NODE_FIELD = "node";
protected static <T extends Builder> ObjectParser<T, Void> createAllocateParser(String command) {
protected static <T extends Builder<?>> ObjectParser<T, Void> createAllocateParser(String command) {
ObjectParser<T, Void> parser = new ObjectParser<>(command);
parser.declareString(Builder::setIndex, new ParseField(INDEX_KEY));
parser.declareInt(Builder::setShard, new ParseField(SHARD_KEY));
parser.declareString(Builder::setNode, new ParseField(NODE_KEY));
parser.declareString(Builder::setIndex, new ParseField(INDEX_FIELD));
parser.declareInt(Builder::setShard, new ParseField(SHARD_FIELD));
parser.declareString(Builder::setNode, new ParseField(NODE_FIELD));
return parser;
}
protected static abstract class Builder<T extends AbstractAllocateAllocationCommand> implements StreamableReader<Builder<T>> {
/**
* Works around ObjectParser not supporting constructor arguments.
*/
protected static abstract class Builder<T extends AbstractAllocateAllocationCommand> {
protected String index;
protected int shard = -1;
protected String node;
@ -73,14 +75,6 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
this.node = node;
}
@Override
public Builder<T> readFrom(StreamInput in) throws IOException {
index = in.readString();
shard = in.readVInt();
node = in.readString();
return this;
}
public abstract Builder<T> parse(XContentParser parser) throws IOException;
public abstract T build();
@ -98,50 +92,6 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field(INDEX_KEY, index());
builder.field(SHARD_KEY, shardId());
builder.field(NODE_KEY, node());
return builder;
}
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeString(node);
}
public static abstract class Factory<T extends AbstractAllocateAllocationCommand> implements AllocationCommand.Factory<T> {
protected abstract Builder<T> newBuilder();
@Override
public T readFrom(StreamInput in) throws IOException {
return newBuilder().readFrom(in).build();
}
@Override
public void writeTo(T command, StreamOutput out) throws IOException {
command.writeTo(out);
}
@Override
public T fromXContent(XContentParser parser) throws IOException {
return newBuilder().parse(parser).build();
}
@Override
public void toXContent(T command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.endObject();
}
}
protected final String index;
protected final int shardId;
protected final String node;
@ -152,6 +102,21 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
this.node = node;
}
/**
* Read from a stream.
*/
protected AbstractAllocateAllocationCommand(StreamInput in) throws IOException {
index = in.readString();
shardId = in.readVInt();
node = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeString(node);
}
/**
* Get the index name
@ -247,4 +212,17 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
}
assert false : "shard to initialize not found in list of unassigned shards";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(INDEX_FIELD, index());
builder.field(SHARD_FIELD, shardId());
builder.field(NODE_FIELD, node());
extraXContent(builder);
return builder.endObject();
}
protected void extraXContent(XContentBuilder builder) throws IOException {
}
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
@ -41,6 +43,7 @@ import java.io.IOException;
*/
public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocationCommand {
public static final String NAME = "allocate_empty_primary";
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);
private static final ObjectParser<Builder, Void> EMPTY_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
@ -55,11 +58,22 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
super(index, shardId, node, acceptDataLoss);
}
/**
* Read from a stream.
*/
public AllocateEmptyPrimaryAllocationCommand(StreamInput in) throws IOException {
super(in);
}
@Override
public String name() {
return NAME;
}
public static AllocateEmptyPrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
return new Builder().parse(parser).build();
}
public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateEmptyPrimaryAllocationCommand> {
@Override
@ -74,14 +88,6 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateEmptyPrimaryAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;

View File

@ -27,6 +27,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
@ -40,6 +42,7 @@ import java.util.List;
*/
public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocationCommand {
public static final String NAME = "allocate_replica";
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);
private static final ObjectParser<AllocateReplicaAllocationCommand.Builder, Void> REPLICA_PARSER = createAllocateParser(NAME);
@ -54,11 +57,22 @@ public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocation
super(index, shardId, node);
}
/**
* Read from a stream.
*/
public AllocateReplicaAllocationCommand(StreamInput in) throws IOException {
super(in);
}
@Override
public String name() {
return NAME;
}
public static AllocateReplicaAllocationCommand fromXContent(XContentParser parser) throws IOException {
return new Builder().parse(parser).build();
}
protected static class Builder extends AbstractAllocateAllocationCommand.Builder<AllocateReplicaAllocationCommand> {
@Override
@ -73,13 +87,6 @@ public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocation
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateReplicaAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;

View File

@ -27,6 +27,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
@ -40,6 +42,7 @@ import java.io.IOException;
*/
public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocationCommand {
public static final String NAME = "allocate_stale_primary";
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);
private static final ObjectParser<Builder, Void> STALE_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
@ -55,11 +58,22 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
super(index, shardId, node, acceptDataLoss);
}
/**
* Read from a stream.
*/
public AllocateStalePrimaryAllocationCommand(StreamInput in) throws IOException {
super(in);
}
@Override
public String name() {
return NAME;
}
public static AllocateStalePrimaryAllocationCommand fromXContent(XContentParser parser) throws IOException {
return new Builder().parse(parser).build();
}
public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateStalePrimaryAllocationCommand> {
@Override
@ -74,14 +88,6 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateStalePrimaryAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;

View File

@ -21,11 +21,8 @@ package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -33,47 +30,15 @@ import java.io.IOException;
/**
* This interface defines the basic methods of commands for allocation
*/
public interface AllocationCommand {
/**
* Factory to create {@link AllocationCommand}s
* @param <T> Type of {@link AllocationCommand}s created by this {@link Factory}
*/
interface Factory<T extends AllocationCommand> {
public interface AllocationCommand extends NamedWriteable<AllocationCommand>, ToXContent {
interface Parser<T extends AllocationCommand> {
/**
* Reads an {@link AllocationCommand} of type <code>T</code> from a {@link StreamInput}
* @param in {@link StreamInput} to read the {@link AllocationCommand} from
* @return {@link AllocationCommand} read from the {@link StreamInput}
* @throws IOException if something happens during reading
*/
T readFrom(StreamInput in) throws IOException;
/**
* Writes an {@link AllocationCommand} to a {@link StreamOutput}
* @param command {@link AllocationCommand} to write
* @param out {@link StreamOutput} to write the {@link AllocationCommand} to
* @throws IOException if something happens during writing the command
*/
void writeTo(T command, StreamOutput out) throws IOException;
/**
* Reads an {@link AllocationCommand} of type <code>T</code> from a {@link XContentParser}
* Reads an {@link AllocationCommand} of type <code>T</code> from a {@link XContentParser}.
* @param parser {@link XContentParser} to use
* @return {@link AllocationCommand} read
* @throws IOException if something happens during reading
*/
T fromXContent(XContentParser parser) throws IOException;
/**
* Writes an {@link AllocationCommand} using an {@link XContentBuilder}
* @param command {@link AllocationCommand} to write
* @param builder {@link XContentBuilder} to use
* @param params parameters to use when writing the command
* @param objectName object the encoding should be encased in, null means a plain object
* @throws IOException if something happens during writing the command
*/
void toXContent(T command, XContentBuilder builder, ToXContent.Params params, @Nullable String objectName) throws IOException;
}
/**
@ -88,4 +53,9 @@ public interface AllocationCommand {
* @throws org.elasticsearch.ElasticsearchException if something happens during reconfiguration
*/
RerouteExplanation execute(RoutingAllocation allocation, boolean explain);
@Override
default String getWriteableName() {
return name();
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.common.xcontent.ParseFieldRegistry;
/**
* Registry of allocation commands. This is it's own class just to make Guice happy.
*/
public class AllocationCommandRegistry extends ParseFieldRegistry<AllocationCommand.Parser<?>> {
public AllocationCommandRegistry() {
super("allocation_command");
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -32,48 +31,13 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A simple {@link AllocationCommand} composite managing several
* {@link AllocationCommand} implementations
*/
public class AllocationCommands {
private static Map<String, AllocationCommand.Factory> factories = new HashMap<>();
/**
* Register a custom index meta data factory. Make sure to call it from a static block.
*/
public static void registerFactory(String type, AllocationCommand.Factory factory) {
factories.put(type, factory);
}
@SuppressWarnings("unchecked")
@Nullable
public static <T extends AllocationCommand> AllocationCommand.Factory<T> lookupFactory(String name) {
return factories.get(name);
}
@SuppressWarnings("unchecked")
public static <T extends AllocationCommand> AllocationCommand.Factory<T> lookupFactorySafe(String name) {
AllocationCommand.Factory<T> factory = factories.get(name);
if (factory == null) {
throw new IllegalArgumentException("No allocation command factory registered for name [" + name + "]");
}
return factory;
}
static {
registerFactory(AllocateEmptyPrimaryAllocationCommand.NAME, new AllocateEmptyPrimaryAllocationCommand.Factory());
registerFactory(AllocateStalePrimaryAllocationCommand.NAME, new AllocateStalePrimaryAllocationCommand.Factory());
registerFactory(AllocateReplicaAllocationCommand.NAME, new AllocateReplicaAllocationCommand.Factory());
registerFactory(CancelAllocationCommand.NAME, new CancelAllocationCommand.Factory());
registerFactory(MoveAllocationCommand.NAME, new MoveAllocationCommand.Factory());
}
private final List<AllocationCommand> commands = new ArrayList<>();
/**
@ -131,8 +95,7 @@ public class AllocationCommands {
AllocationCommands commands = new AllocationCommands();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
String name = in.readString();
commands.add(lookupFactorySafe(name).readFrom(in));
commands.add(in.readAllocationCommand());
}
return commands;
}
@ -147,8 +110,7 @@ public class AllocationCommands {
public static void writeTo(AllocationCommands commands, StreamOutput out) throws IOException {
out.writeVInt(commands.commands.size());
for (AllocationCommand command : commands.commands) {
out.writeString(command.name());
lookupFactorySafe(command.name()).writeTo(command, out);
out.writeAllocationCommand(command);
}
}
@ -162,10 +124,11 @@ public class AllocationCommands {
* }
* </pre>
* @param parser {@link XContentParser} to read the commands from
* @param registry of allocation command parsers
* @return {@link AllocationCommands} read
* @throws IOException if something bad happens while reading the stream
*/
public static AllocationCommands fromXContent(XContentParser parser) throws IOException {
public static AllocationCommands fromXContent(XContentParser parser, AllocationCommandRegistry registry) throws IOException {
AllocationCommands commands = new AllocationCommands();
XContentParser.Token token = parser.currentToken();
@ -194,7 +157,7 @@ public class AllocationCommands {
token = parser.nextToken();
String commandName = parser.currentName();
token = parser.nextToken();
commands.add(AllocationCommands.lookupFactorySafe(commandName).fromXContent(parser));
commands.add(registry.lookup(commandName, parser).fromXContent(parser));
// move to the end object one
if (parser.nextToken() != XContentParser.Token.END_OBJECT) {
throw new ElasticsearchParseException("allocation command is malformed, done parsing a command, but didn't get END_OBJECT, got [{}] instead", token);
@ -218,8 +181,7 @@ public class AllocationCommands {
builder.startArray("commands");
for (AllocationCommand command : commands.commands) {
builder.startObject();
builder.field(command.name());
AllocationCommands.lookupFactorySafe(command.name()).toXContent(command, builder, params, null);
builder.field(command.name(), command);
builder.endObject();
}
builder.endArray();

View File

@ -23,9 +23,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -34,11 +32,11 @@ import java.io.IOException;
*/
public abstract class BasePrimaryAllocationCommand extends AbstractAllocateAllocationCommand {
private static final String ACCEPT_DATA_LOSS_KEY = "accept_data_loss";
private static final String ACCEPT_DATA_LOSS_FIELD = "accept_data_loss";
protected static <T extends Builder> ObjectParser<T, Void> createAllocatePrimaryParser(String command) {
protected static <T extends Builder<?>> ObjectParser<T, Void> createAllocatePrimaryParser(String command) {
ObjectParser<T, Void> parser = AbstractAllocateAllocationCommand.createAllocateParser(command);
parser.declareBoolean(Builder::setAcceptDataLoss, new ParseField(ACCEPT_DATA_LOSS_KEY));
parser.declareBoolean(Builder::setAcceptDataLoss, new ParseField(ACCEPT_DATA_LOSS_FIELD));
return parser;
}
@ -49,6 +47,20 @@ public abstract class BasePrimaryAllocationCommand extends AbstractAllocateAlloc
this.acceptDataLoss = acceptDataLoss;
}
/**
* Read from a stream.
*/
protected BasePrimaryAllocationCommand(StreamInput in) throws IOException {
super(in);
acceptDataLoss = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acceptDataLoss);
}
/**
* The operation only executes if the user explicitly agrees to possible data loss
*
@ -64,25 +76,10 @@ public abstract class BasePrimaryAllocationCommand extends AbstractAllocateAlloc
public void setAcceptDataLoss(boolean acceptDataLoss) {
this.acceptDataLoss = acceptDataLoss;
}
@Override
public Builder readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acceptDataLoss = in.readBoolean();
return this;
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.toXContent(builder, params);
builder.field(ACCEPT_DATA_LOSS_KEY, acceptDataLoss);
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acceptDataLoss);
protected void extraXContent(XContentBuilder builder) throws IOException {
builder.field(ACCEPT_DATA_LOSS_FIELD, acceptDataLoss);
}
}

View File

@ -28,9 +28,9 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -44,80 +44,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
public class CancelAllocationCommand implements AllocationCommand {
public static final String NAME = "cancel";
/**
* Factory creating {@link CancelAllocationCommand}s
*/
public static class Factory implements AllocationCommand.Factory<CancelAllocationCommand> {
@Override
public CancelAllocationCommand readFrom(StreamInput in) throws IOException {
return new CancelAllocationCommand(in.readString(), in.readVInt(), in.readString(), in.readBoolean());
}
@Override
public void writeTo(CancelAllocationCommand command, StreamOutput out) throws IOException {
out.writeString(command.index());
out.writeVInt(command.shardId());
out.writeString(command.node());
out.writeBoolean(command.allowPrimary());
}
@Override
public CancelAllocationCommand fromXContent(XContentParser parser) throws IOException {
String index = null;
int shardId = -1;
String nodeId = null;
boolean allowPrimary = false;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("node".equals(currentFieldName)) {
nodeId = parser.text();
} else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) {
allowPrimary = parser.booleanValue();
} else {
throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
}
} else {
throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
}
}
if (index == null) {
throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
}
if (shardId == -1) {
throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
}
if (nodeId == null) {
throw new ElasticsearchParseException("[{}] command missing the node parameter", NAME);
}
return new CancelAllocationCommand(index, shardId, nodeId, allowPrimary);
}
@Override
public void toXContent(CancelAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.index());
builder.field("shard", command.shardId());
builder.field("node", command.node());
builder.field("allow_primary", command.allowPrimary());
builder.endObject();
}
}
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);
private final String index;
private final int shardId;
@ -138,6 +65,24 @@ public class CancelAllocationCommand implements AllocationCommand {
this.allowPrimary = allowPrimary;
}
/**
* Read from a stream.
*/
public CancelAllocationCommand(StreamInput in) throws IOException {
index = in.readString();
shardId = in.readVInt();
node = in.readString();
allowPrimary = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeString(node);
out.writeBoolean(allowPrimary);
}
@Override
public String name() {
return NAME;
@ -246,4 +191,53 @@ public class CancelAllocationCommand implements AllocationCommand {
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index());
builder.field("shard", shardId());
builder.field("node", node());
builder.field("allow_primary", allowPrimary());
return builder.endObject();
}
public static CancelAllocationCommand fromXContent(XContentParser parser) throws IOException {
String index = null;
int shardId = -1;
String nodeId = null;
boolean allowPrimary = false;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("node".equals(currentFieldName)) {
nodeId = parser.text();
} else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) {
allowPrimary = parser.booleanValue();
} else {
throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
}
} else {
throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
}
}
if (index == null) {
throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
}
if (shardId == -1) {
throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
}
if (nodeId == null) {
throw new ElasticsearchParseException("[{}] command missing the node parameter", NAME);
}
return new CancelAllocationCommand(index, shardId, nodeId, allowPrimary);
}
}

View File

@ -27,9 +27,9 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -43,79 +43,7 @@ import java.io.IOException;
public class MoveAllocationCommand implements AllocationCommand {
public static final String NAME = "move";
public static class Factory implements AllocationCommand.Factory<MoveAllocationCommand> {
@Override
public MoveAllocationCommand readFrom(StreamInput in) throws IOException {
return new MoveAllocationCommand(in.readString(), in.readVInt(), in.readString(), in.readString());
}
@Override
public void writeTo(MoveAllocationCommand command, StreamOutput out) throws IOException {
out.writeString(command.index());
out.writeVInt(command.shardId());
out.writeString(command.fromNode());
out.writeString(command.toNode());
}
@Override
public MoveAllocationCommand fromXContent(XContentParser parser) throws IOException {
String index = null;
int shardId = -1;
String fromNode = null;
String toNode = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("from_node".equals(currentFieldName) || "fromNode".equals(currentFieldName)) {
fromNode = parser.text();
} else if ("to_node".equals(currentFieldName) || "toNode".equals(currentFieldName)) {
toNode = parser.text();
} else {
throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
}
} else {
throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
}
}
if (index == null) {
throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
}
if (shardId == -1) {
throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
}
if (fromNode == null) {
throw new ElasticsearchParseException("[{}] command missing the from_node parameter", NAME);
}
if (toNode == null) {
throw new ElasticsearchParseException("[{}] command missing the to_node parameter", NAME);
}
return new MoveAllocationCommand(index, shardId, fromNode, toNode);
}
@Override
public void toXContent(MoveAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.index());
builder.field("shard", command.shardId());
builder.field("from_node", command.fromNode());
builder.field("to_node", command.toNode());
builder.endObject();
}
}
public static final ParseField COMMAND_NAME_FIELD = new ParseField(NAME);
private final String index;
private final int shardId;
@ -129,6 +57,24 @@ public class MoveAllocationCommand implements AllocationCommand {
this.toNode = toNode;
}
/**
* Read from a stream.
*/
public MoveAllocationCommand(StreamInput in) throws IOException {
index = in.readString();
shardId = in.readVInt();
fromNode = in.readString();
toNode = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeString(fromNode);
out.writeString(toNode);
}
@Override
public String name() {
return NAME;
@ -197,4 +143,56 @@ public class MoveAllocationCommand implements AllocationCommand {
}
return new RerouteExplanation(this, decision);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index());
builder.field("shard", shardId());
builder.field("from_node", fromNode());
builder.field("to_node", toNode());
return builder.endObject();
}
public static MoveAllocationCommand fromXContent(XContentParser parser) throws IOException {
String index = null;
int shardId = -1;
String fromNode = null;
String toNode = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("from_node".equals(currentFieldName) || "fromNode".equals(currentFieldName)) {
fromNode = parser.text();
} else if ("to_node".equals(currentFieldName) || "toNode".equals(currentFieldName)) {
toNode = parser.text();
} else {
throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
}
} else {
throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
}
}
if (index == null) {
throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
}
if (shardId == -1) {
throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
}
if (fromNode == null) {
throw new ElasticsearchParseException("[{}] command missing the from_node parameter", NAME);
}
if (toNode == null) {
throw new ElasticsearchParseException("[{}] command missing the to_node parameter", NAME);
}
return new MoveAllocationCommand(index, shardId, fromNode, toNode);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
@ -815,6 +816,13 @@ public abstract class StreamInput extends InputStream {
return readNamedWriteable(DocValueFormat.class);
}
/**
* Reads an {@link AllocationCommand} from the stream.
*/
public AllocationCommand readAllocationCommand() throws IOException {
return readNamedWriteable(AllocationCommand.class);
}
/**
* Reads a list of objects
*/

View File

@ -29,6 +29,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
@ -814,4 +815,11 @@ public abstract class StreamOutput extends OutputStream {
public void writeValueFormat(DocValueFormat format) throws IOException {
writeNamedWriteable(format);
}
/**
* Writes an {@link AllocationCommand} to the stream.
*/
public void writeAllocationCommand(AllocationCommand command) throws IOException {
writeNamedWriteable(command);
}
}

View File

@ -20,12 +20,23 @@
package org.elasticsearch.common.network;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommandRegistry;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
@ -307,6 +318,7 @@ public class NetworkModule extends AbstractModule {
private final Settings settings;
private final boolean transportClient;
private final AllocationCommandRegistry allocationCommandRegistry = new AllocationCommandRegistry();
private final ExtensionPoint.SelectedType<TransportService> transportServiceTypes = new ExtensionPoint.SelectedType<>("transport_service", TransportService.class);
private final ExtensionPoint.SelectedType<Transport> transportTypes = new ExtensionPoint.SelectedType<>("transport", Transport.class);
private final ExtensionPoint.SelectedType<HttpServerTransport> httpTransportTypes = new ExtensionPoint.SelectedType<>("http_transport", HttpServerTransport.class);
@ -332,6 +344,7 @@ public class NetworkModule extends AbstractModule {
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
registerBuiltinAllocationCommands();
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@ -381,6 +394,29 @@ public class NetworkModule extends AbstractModule {
namedWriteableRegistry.register(Task.Status.class, name, reader);
}
/**
* Register an allocation command.
* <p>
* This lives here instead of the more aptly named ClusterModule because the Transport client needs these to be registered.
* </p>
* @param reader the reader to read it from a stream
* @param parser the parser to read it from XContent
* @param commandName the names under which the command should be parsed. The {@link ParseField#getPreferredName()} is special because
* it is the name under which the command's reader is registered.
*/
public <T extends AllocationCommand> void registerAllocationCommand(Writeable.Reader<T> reader, AllocationCommand.Parser<T> parser,
ParseField commandName) {
allocationCommandRegistry.register(parser, commandName);
namedWriteableRegistry.register(AllocationCommand.class, commandName.getPreferredName(), reader);
}
/**
* The registry of allocation command parsers.
*/
public AllocationCommandRegistry getAllocationCommandRegistry() {
return allocationCommandRegistry;
}
@Override
protected void configure() {
bind(NetworkService.class).toInstance(networkService);
@ -401,6 +437,22 @@ public class NetworkModule extends AbstractModule {
bind(RestController.class).asEagerSingleton();
catHandlers.bind(binder());
restHandlers.bind(binder());
// Bind the AllocationCommandRegistry so RestClusterRerouteAction can get it.
bind(AllocationCommandRegistry.class).toInstance(allocationCommandRegistry);
}
}
private void registerBuiltinAllocationCommands() {
registerAllocationCommand(CancelAllocationCommand::new, CancelAllocationCommand::fromXContent,
CancelAllocationCommand.COMMAND_NAME_FIELD);
registerAllocationCommand(MoveAllocationCommand::new, MoveAllocationCommand::fromXContent,
MoveAllocationCommand.COMMAND_NAME_FIELD);
registerAllocationCommand(AllocateReplicaAllocationCommand::new, AllocateReplicaAllocationCommand::fromXContent,
AllocateReplicaAllocationCommand.COMMAND_NAME_FIELD);
registerAllocationCommand(AllocateEmptyPrimaryAllocationCommand::new, AllocateEmptyPrimaryAllocationCommand::fromXContent,
AllocateEmptyPrimaryAllocationCommand.COMMAND_NAME_FIELD);
registerAllocationCommand(AllocateStalePrimaryAllocationCommand::new, AllocateStalePrimaryAllocationCommand::fromXContent,
AllocateStalePrimaryAllocationCommand.COMMAND_NAME_FIELD);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommandRegistry;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,12 +46,17 @@ public class RestClusterRerouteAction extends BaseRestHandler {
private final SettingsFilter settingsFilter;
private static String DEFAULT_METRICS = Strings.arrayToCommaDelimitedString(EnumSet.complementOf(EnumSet.of(ClusterState.Metric.METADATA)).toArray());
private static String DEFAULT_METRICS = Strings
.arrayToCommaDelimitedString(EnumSet.complementOf(EnumSet.of(ClusterState.Metric.METADATA)).toArray());
private final AllocationCommandRegistry registry;
@Inject
public RestClusterRerouteAction(Settings settings, RestController controller, Client client, SettingsFilter settingsFilter) {
public RestClusterRerouteAction(Settings settings, RestController controller, Client client, SettingsFilter settingsFilter,
AllocationCommandRegistry registry) {
super(settings, client);
this.settingsFilter = settingsFilter;
this.registry = registry;
controller.registerHandler(RestRequest.Method.POST, "/_cluster/reroute", this);
}
@ -62,7 +68,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));
clusterRerouteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterRerouteRequest.masterNodeTimeout()));
if (request.hasContent()) {
clusterRerouteRequest.source(request.content());
clusterRerouteRequest.source(request.content(), registry, parseFieldMatcher);
}
client.admin().cluster().reroute(clusterRerouteRequest, new AcknowledgedRestListener<ClusterRerouteResponse>(channel) {

View File

@ -23,11 +23,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpda
import org.elasticsearch.cluster.metadata.IndexTemplateFilter;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;

View File

@ -31,14 +31,18 @@ import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllo
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommandRegistry;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -396,7 +400,15 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
);
BytesStreamOutput bytes = new BytesStreamOutput();
AllocationCommands.writeTo(commands, bytes);
AllocationCommands sCommands = AllocationCommands.readFrom(StreamInput.wrap(bytes.bytes()));
StreamInput in = StreamInput.wrap(bytes.bytes());
// Since the commands are named writeable we need to register them and wrap the input stream
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
new NetworkModule(null, Settings.EMPTY, true, namedWriteableRegistry);
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
// Now we can read them!
AllocationCommands sCommands = AllocationCommands.readFrom(in);
assertThat(sCommands.commands().size(), equalTo(5));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(1));
@ -438,7 +450,9 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
// move two tokens, parser expected to be "on" `commands` field
parser.nextToken();
parser.nextToken();
AllocationCommands sCommands = AllocationCommands.fromXContent(parser);
AllocationCommandRegistry registry = new NetworkModule(null, Settings.EMPTY, true, new NamedWriteableRegistry())
.getAllocationCommandRegistry();
AllocationCommands sCommands = AllocationCommands.fromXContent(parser, registry);
assertThat(sCommands.commands().size(), equalTo(5));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(1));