Stop using PROTOTYPE in NamedWriteableRegistry

readFrom is confusing because it requires an instance of the type that it
is reading but it doesn't modify it. But we also have (deprecated) methods
named readFrom that *do* modify the instance. The "right" way to implement
the non-modifying readFrom is to delegate to a constructor that takes a
StreamInput so that the read object can be immutable. Now that we have
`@FunctionalInterface`s it is fairly easy to register things by referring
directly to the constructor.

This change modifying NamedWriteableRegistry so that it does that. It keeps
supporting `registerPrototype` which registers objects to be read by
readFrom but deprecates it and delegates it to a new `register` method
that allows passing a simple functional interface. It also cuts Task.Status
subclasses over to using that method.

The start of #17085
This commit is contained in:
Nik Everett 2016-03-23 10:10:42 -04:00
parent 609456a324
commit 93ab4cfc99
18 changed files with 104 additions and 110 deletions

View File

@ -358,7 +358,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]inject[/\\]multibindings[/\\]MapBinder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]inject[/\\]spi[/\\]InjectionPoint.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]io[/\\]Channels.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]io[/\\]stream[/\\]NamedWriteableRegistry.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]joda[/\\]Joda.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]lucene[/\\]Lucene.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]lucene[/\\]all[/\\]AllTermQuery.java" checks="LineLength" />

View File

@ -235,7 +235,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
scriptStats = in.readOptionalStreamable(ScriptStats::new);
discoveryStats = in.readOptionalStreamable(() -> new DiscoveryStats(null));
ingestStats = in.readOptionalWritable(IngestStats::new);
ingestStats = in.readOptionalWriteable(IngestStats::new);
}
@Override

View File

@ -283,7 +283,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
source = in.readString();
updateAllTypes = in.readBoolean();
readTimeout(in);
concreteIndex = in.readOptionalWritable(Index::new);
concreteIndex = in.readOptionalWriteable(Index::new);
}
@Override

View File

@ -59,7 +59,7 @@ public class ReplicationTask extends Task {
}
public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status("prototype");
public static final String NAME = "replication";
private final String phase;
@ -73,7 +73,7 @@ public class ReplicationTask extends Task {
@Override
public String getWriteableName() {
return "replication";
return NAME;
}
@Override
@ -88,10 +88,5 @@ public class ReplicationTask extends Task {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
@Override
public Status readFrom(StreamInput in) throws IOException {
return new Status(in);
}
}
}

View File

@ -36,7 +36,6 @@ public class NamedWriteableAwareStreamInput extends FilterStreamInput {
@Override
<C> C readNamedWriteable(Class<C> categoryClass) throws IOException {
String name = readString();
NamedWriteable<? extends C> namedWriteable = namedWriteableRegistry.getPrototype(categoryClass, name);
return namedWriteable.readFrom(this);
return namedWriteableRegistry.getReader(categoryClass, name).read(this);
}
}

View File

@ -31,54 +31,70 @@ public class NamedWriteableRegistry {
private final Map<Class<?>, InnerRegistry<?>> registry = new HashMap<>();
/**
* Registers a {@link NamedWriteable} prototype given its category
* Register a {@link NamedWriteable} given its category, its name, and a function to read it from the stream.
*
* This method suppresses the rawtypes warning because it intentionally using NamedWriteable instead of {@code NamedWriteable<T>} so it
* is easier to use and because we might be able to drop the type parameter from NamedWriteable entirely some day.
*/
public synchronized <T> void registerPrototype(Class<T> categoryClass, NamedWriteable<? extends T> namedWriteable) {
@SuppressWarnings("rawtypes")
public synchronized <T extends NamedWriteable> void register(Class<T> categoryClass, String name,
Writeable.Reader<? extends T> reader) {
@SuppressWarnings("unchecked")
InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
InnerRegistry<T> innerRegistry = (InnerRegistry<T>) registry.get(categoryClass);
if (innerRegistry == null) {
innerRegistry = new InnerRegistry<>(categoryClass);
registry.put(categoryClass, innerRegistry);
}
innerRegistry.registerPrototype(namedWriteable);
innerRegistry.register(name, reader);
}
/**
* Registers a {@link NamedWriteable} prototype given its category.
* @deprecated Prefer {@link #register(Class, String, org.elasticsearch.common.io.stream.Writeable.Reader)}
*/
@Deprecated
@SuppressWarnings("rawtypes") // TODO remove this method entirely before 5.0.0 GA
public synchronized <T extends NamedWriteable> void registerPrototype(Class<T> categoryClass,
NamedWriteable<? extends T> namedWriteable) {
register(categoryClass, namedWriteable.getWriteableName(), namedWriteable::readFrom);
}
/**
* Returns a prototype of the {@link NamedWriteable} object identified by the name provided as argument and its category
*/
public synchronized <T> NamedWriteable<? extends T> getPrototype(Class<T> categoryClass, String name) {
public synchronized <T> Writeable.Reader<? extends T> getReader(Class<T> categoryClass, String name) {
@SuppressWarnings("unchecked")
InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
if (innerRegistry == null) {
throw new IllegalArgumentException("unknown named writeable category [" + categoryClass.getName() + "]");
}
return innerRegistry.getPrototype(name);
return innerRegistry.getReader(name);
}
private static class InnerRegistry<T> {
private final Map<String, NamedWriteable<? extends T>> registry = new HashMap<>();
private final Map<String, Writeable.Reader<? extends T>> registry = new HashMap<>();
private final Class<T> categoryClass;
private InnerRegistry(Class<T> categoryClass) {
this.categoryClass = categoryClass;
}
private void registerPrototype(NamedWriteable<? extends T> namedWriteable) {
NamedWriteable<? extends T> existingNamedWriteable = registry.get(namedWriteable.getWriteableName());
if (existingNamedWriteable != null) {
throw new IllegalArgumentException("named writeable of type [" + namedWriteable.getClass().getName() + "] with name [" + namedWriteable.getWriteableName() + "] " +
"is already registered by type [" + existingNamedWriteable.getClass().getName() + "] within category [" + categoryClass.getName() + "]");
private void register(String name, Writeable.Reader<? extends T> reader) {
Writeable.Reader<? extends T> existingReader = registry.get(name);
if (existingReader != null) {
throw new IllegalArgumentException(
"named writeable [" + categoryClass.getName() + "][" + name + "] is already registered by [" + reader + "]");
}
registry.put(namedWriteable.getWriteableName(), namedWriteable);
registry.put(name, reader);
}
private NamedWriteable<? extends T> getPrototype(String name) {
NamedWriteable<? extends T> namedWriteable = registry.get(name);
if (namedWriteable == null) {
throw new IllegalArgumentException("unknown named writeable with name [" + name + "] within category [" + categoryClass.getName() + "]");
private Writeable.Reader<? extends T> getReader(String name) {
Writeable.Reader<? extends T> reader = registry.get(name);
if (reader == null) {
throw new IllegalArgumentException("unknown named writeable [" + categoryClass.getName() + "][" + name + "]");
}
return namedWriteable;
return reader;
}
}
}

View File

@ -566,9 +566,9 @@ public abstract class StreamInput extends InputStream {
}
}
public <T extends Writeable> T readOptionalWritable(Writeable.IOFunction<StreamInput, T> provider) throws IOException {
public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> provider) throws IOException {
if (readBoolean()) {
return provider.apply(this);
return provider.read(this);
} else {
return null;
}

View File

@ -23,10 +23,7 @@ import java.io.IOException;
/**
* Implementers can be read from {@linkplain StreamInput} by calling their {@link #readFrom(StreamInput)} method.
*
* It is common for implementers of this interface to declare a <code>public static final</code> instance of themselves named PROTOTYPE so
* users can call {@linkplain #readFrom(StreamInput)} on it. It is also fairly typical for readFrom to be implemented as a method that just
* calls a constructor that takes {@linkplain StreamInput} as a parameter. This allows the fields in the implementer to be
* <code>final</code>.
* Implementers of this interface that also implement {@link Writeable} should see advice there on how to do so.
*/
public interface StreamableReader<T> {
/**

View File

@ -31,21 +31,30 @@ import java.io.IOException;
*
* Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable}
* so this isn't always possible.
*
* The fact that this interface extends {@link StreamableReader} should be consider vestigial. Instead of using its
* {@link #readFrom(StreamInput)} method you should prefer using the Reader interface as a reference to a constructor that takes
* {@link StreamInput}. The reasoning behind this is that most "good" readFrom implementations just delegated to such a constructor anyway
* and they required an unsightly PROTOTYPE object.
*/
public interface Writeable<T> extends StreamableReader<T> {
public interface Writeable<T> extends StreamableReader<T> { // TODO remove extends StreamableReader<T> from this interface, and remove <T>
/**
* Write this into the {@linkplain StreamOutput}.
*/
void writeTo(StreamOutput out) throws IOException;
@FunctionalInterface
interface IOFunction<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t) throws IOException;
}
@Override
default T readFrom(StreamInput in) throws IOException {
// See class javadoc for reasoning
throw new UnsupportedOperationException("Prefer calling a constructor that takes a StreamInput to calling readFrom.");
}
/**
* Reference to a method that can read some object from a stream. By convention this is a constructor that takes
* {@linkplain StreamInput} as an argument for most classes and a static method for things like enums.
*/
@FunctionalInterface
interface Reader<R> {
R read(StreamInput t) throws IOException;
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -328,7 +329,7 @@ public class NetworkModule extends AbstractModule {
registerTransportService(NETTY_TRANSPORT, TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.PROTOTYPE);
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@ -374,8 +375,8 @@ public class NetworkModule extends AbstractModule {
}
}
public void registerTaskStatus(Task.Status prototype) {
namedWriteableRegistry.registerPrototype(Task.Status.class, prototype);
public void registerTaskStatus(String name, Writeable.Reader<? extends Task.Status> reader) {
namedWriteableRegistry.register(Task.Status.class, name, reader);
}
@Override

View File

@ -146,8 +146,7 @@ public abstract class AbstractShapeBuilderTestCase<SB extends ShapeBuilder> exte
try (BytesStreamOutput output = new BytesStreamOutput()) {
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
ShapeBuilder prototype = (ShapeBuilder) namedWriteableRegistry.getPrototype(ShapeBuilder.class, original.getWriteableName());
return prototype.readFrom(in);
return namedWriteableRegistry.getReader(ShapeBuilder.class, original.getWriteableName()).read(in);
}
}
}

View File

@ -31,6 +31,7 @@ import java.util.Objects;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
/**
* Tests for {@link BytesStreamOutput} paging behaviour.
@ -301,7 +302,7 @@ public class BytesStreamsTests extends ESTestCase {
public void testNamedWriteable() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new);
TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
out.writeNamedWriteable(namedWriteableIn);
byte[] bytes = out.bytes().toBytes();
@ -314,32 +315,25 @@ public class BytesStreamsTests extends ESTestCase {
public void testNamedWriteableDuplicates() throws IOException {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
try {
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
fail("registerPrototype should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("named writeable of type [" + TestNamedWriteable.class.getName() + "] with name [" + TestNamedWriteable.NAME + "] is already registered by type ["
+ TestNamedWriteable.class.getName() + "] within category [" + BaseNamedWriteable.class.getName() + "]"));
}
namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new));
assertThat(e.getMessage(), startsWith("named writeable [" + BaseNamedWriteable.class.getName() + "][" + TestNamedWriteable.NAME
+ "] is already registered by ["));
}
public void testNamedWriteableUnknownCategory() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.writeNamedWriteable(new TestNamedWriteable("test1", "test2"));
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), new NamedWriteableRegistry());
try {
//no named writeable registered with given name, can write but cannot read it back
in.readNamedWriteable(BaseNamedWriteable.class);
fail("read should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unknown named writeable category [" + BaseNamedWriteable.class.getName() + "]"));
}
//no named writeable registered with given name, can write but cannot read it back
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> in.readNamedWriteable(BaseNamedWriteable.class));
assertThat(e.getMessage(), equalTo("unknown named writeable category [" + BaseNamedWriteable.class.getName() + "]"));
}
public void testNamedWriteableUnknownNamedWriteable() throws IOException {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new);
BytesStreamOutput out = new BytesStreamOutput();
out.writeNamedWriteable(new NamedWriteable() {
@Override
@ -362,7 +356,7 @@ public class BytesStreamsTests extends ESTestCase {
in.readNamedWriteable(BaseNamedWriteable.class);
fail("read should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unknown named writeable with name [unknown] within category [" + BaseNamedWriteable.class.getName() + "]"));
assertThat(e.getMessage(), equalTo("unknown named writeable [" + BaseNamedWriteable.class.getName() + "][unknown]"));
}
}
@ -395,6 +389,11 @@ public class BytesStreamsTests extends ESTestCase {
this.field2 = field2;
}
public TestNamedWriteable(StreamInput in) throws IOException {
field1 = in.readString();
field2 = in.readString();
}
@Override
public String getWriteableName() {
return NAME;
@ -406,11 +405,6 @@ public class BytesStreamsTests extends ESTestCase {
out.writeString(field2);
}
@Override
public TestNamedWriteable readFrom(StreamInput in) throws IOException {
return new TestNamedWriteable(in.readString(), in.readString());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -191,23 +191,24 @@ public class NetworkModuleTests extends ModuleTestCase {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, registry);
// Builtin prototype comes back
assertNotNull(registry.getPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE.getWriteableName()));
// Builtin reader comes back
assertNotNull(registry.getReader(Task.Status.class, ReplicationTask.Status.NAME));
Task.Status dummy = new DummyTaskStatus();
module.registerTaskStatus(dummy);
assertThat(registry.getPrototype(Task.Status.class, "dummy"), sameInstance(dummy));
module.registerTaskStatus(DummyTaskStatus.NAME, DummyTaskStatus::new);
assertEquals("test", expectThrows(UnsupportedOperationException.class,
() -> registry.getReader(Task.Status.class, DummyTaskStatus.NAME).read(null)).getMessage());
}
private class DummyTaskStatus implements Task.Status {
@Override
public String getWriteableName() {
return "dummy";
public static final String NAME = "dummy";
public DummyTaskStatus(StreamInput in) {
throw new UnsupportedOperationException("test");
}
@Override
public Status readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
public String getWriteableName() {
return NAME;
}
@Override

View File

@ -252,9 +252,8 @@ public abstract class BaseAggregationTestCase<AB extends AggregatorBuilder<AB>>
try (BytesStreamOutput output = new BytesStreamOutput()) {
testAgg.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
AggregatorBuilder prototype = (AggregatorBuilder) namedWriteableRegistry.getPrototype(AggregatorBuilder.class,
testAgg.getWriteableName());
AggregatorBuilder deserializedQuery = prototype.readFrom(in);
AggregatorBuilder deserializedQuery = namedWriteableRegistry.getReader(AggregatorBuilder.class, testAgg.getWriteableName())
.read(in);
assertEquals(deserializedQuery, testAgg);
assertEquals(deserializedQuery.hashCode(), testAgg.hashCode());
assertNotSame(deserializedQuery, testAgg);
@ -294,10 +293,8 @@ public abstract class BaseAggregationTestCase<AB extends AggregatorBuilder<AB>>
try (BytesStreamOutput output = new BytesStreamOutput()) {
agg.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
AggregatorBuilder prototype = (AggregatorBuilder) namedWriteableRegistry.getPrototype(AggregatorBuilder.class,
agg.getWriteableName());
@SuppressWarnings("unchecked")
AB secondAgg = (AB) prototype.readFrom(in);
AB secondAgg = (AB) namedWriteableRegistry.getReader(AggregatorBuilder.class, agg.getWriteableName()).read(in);
return secondAgg;
}
}

View File

@ -255,10 +255,7 @@ public abstract class AbstractSortTestCase<T extends SortBuilder<T>> extends EST
try (BytesStreamOutput output = new BytesStreamOutput()) {
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
T prototype = (T) namedWriteableRegistry.getPrototype(SortBuilder.class,
original.getWriteableName());
T copy = prototype.readFrom(in);
return copy;
return (T) namedWriteableRegistry.getReader(SortBuilder.class, original.getWriteableName()).read(in);
}
}
}

View File

@ -108,9 +108,7 @@ public abstract class SmoothingModelTestCase extends ESTestCase {
XContentParser parser = XContentHelper.createParser(contentBuilder.bytes());
context.reset(parser);
parser.nextToken(); // go to start token, real parsing would do that in the outer element parser
SmoothingModel prototype = (SmoothingModel) namedWriteableRegistry.getPrototype(SmoothingModel.class,
testModel.getWriteableName());
SmoothingModel parsedModel = prototype.innerFromXContent(context);
SmoothingModel parsedModel = testModel.innerFromXContent(context);
assertNotSame(testModel, parsedModel);
assertEquals(testModel, parsedModel);
assertEquals(testModel.hashCode(), parsedModel.hashCode());
@ -188,9 +186,7 @@ public abstract class SmoothingModelTestCase extends ESTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
SmoothingModel prototype = (SmoothingModel) namedWriteableRegistry.getPrototype(SmoothingModel.class,
original.getWriteableName());
return prototype.readFrom(in);
return namedWriteableRegistry.getReader(SmoothingModel.class, original.getWriteableName()).read(in);
}
}
}

View File

@ -69,8 +69,7 @@ public class BulkByScrollTask extends CancellableTask {
}
public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, timeValueNanos(0), null);
public static final String NAME = "bulk-by-scroll";
private final long total;
private final long updated;
private final long created;
@ -178,12 +177,7 @@ public class BulkByScrollTask extends CancellableTask {
@Override
public String getWriteableName() {
return "bulk-by-scroll";
}
@Override
public Status readFrom(StreamInput in) throws IOException {
return new Status(in);
return NAME;
}
/**

View File

@ -44,6 +44,6 @@ public class ReindexPlugin extends Plugin {
public void onModule(NetworkModule networkModule) {
networkModule.registerRestHandler(RestReindexAction.class);
networkModule.registerRestHandler(RestUpdateByQueryAction.class);
networkModule.registerTaskStatus(BulkByScrollTask.Status.PROTOTYPE);
networkModule.registerTaskStatus(BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new);
}
}