From e1e9e1a6e14f892871033f94fa455d820ca5f333 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 22 Jul 2015 12:11:10 +0200 Subject: [PATCH 1/3] Transport: allow to de-serialize arbitrary objects given their name This commit makes it possible to serialize arbitrary objects by having them extend Writeable. When reading them though, we need to be able to identify which object we have to create, based on its name. This is useful for queries once we move to parsing on the coordinating node, as well as with aggregations and so on. Introduced a new abstraction called NamedWriteable, which is supported by StreamOutput and StreamInput through writeNamedWriteable and readNamedWriteable methods. A new NamedWriteableRegistry is introduced also where named writeable prototypes need to be registered so that we are able to retrieve the proper instance of the writeable given its name and then de-serialize it calling readFrom against it. Closes #12393 --- .../compress/CompressedStreamInput.java | 4 +- .../common/io/stream/FilterStreamInput.java | 71 ++++++++++ .../common/io/stream/NamedWriteable.java | 33 +++++ .../NamedWriteableAwareStreamInput.java | 42 ++++++ .../io/stream/NamedWriteableRegistry.java | 84 +++++++++++ .../common/io/stream/StreamInput.java | 24 ++-- .../common/io/stream/StreamOutput.java | 42 +++--- .../java/org/elasticsearch/node/Node.java | 5 +- .../transport/TransportModule.java | 3 + .../transport/local/LocalTransport.java | 7 +- .../netty/MessageChannelHandler.java | 2 + .../transport/netty/NettyTransport.java | 10 +- .../BenchmarkNettyLargeMessages.java | 5 +- .../transport/TransportBenchmark.java | 5 +- .../ClusterStateDiffPublishingTests.java | 3 +- .../BytesStreamsTests.java | 133 +++++++++++++++++- .../discovery/ZenFaultDetectionTests.java | 3 +- .../ping/multicast/MulticastZenPingIT.java | 7 +- .../zen/ping/unicast/UnicastZenPingIT.java | 5 +- .../plugins/PluggableTransportModuleIT.java | 5 +- .../transport/AssertingLocalTransport.java | 5 +- .../NettySizeHeaderFrameDecoderTests.java | 3 +- .../local/SimpleLocalTransportTests.java | 3 +- .../netty/NettyScheduledPingTests.java | 5 +- .../transport/netty/NettyTransportIT.java | 10 +- .../netty/NettyTransportMultiPortTests.java | 4 +- .../netty/SimpleNettyTransportTests.java | 3 +- 27 files changed, 460 insertions(+), 66 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java create mode 100644 core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteable.java create mode 100644 core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java create mode 100644 core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java rename core/src/test/java/org/elasticsearch/common/io/{streams => stream}/BytesStreamsTests.java (67%) diff --git a/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java b/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java index 82eefe13a4c..44690e66efa 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java @@ -44,9 +44,9 @@ public abstract class CompressedStreamInput extends StreamInput { } @Override - public StreamInput setVersion(Version version) { + public void setVersion(Version version) { in.setVersion(version); - return super.setVersion(version); + super.setVersion(version); } /** diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java new file mode 100644 index 00000000000..8904e80b646 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; + +import java.io.IOException; + +/** + * Wraps a {@link StreamInput} and delegates to it. To be used to add functionality to an existing stream by subclassing. + */ +public class FilterStreamInput extends StreamInput { + + private final StreamInput delegate; + + protected FilterStreamInput(StreamInput delegate) { + this.delegate = delegate; + } + + @Override + public byte readByte() throws IOException { + return delegate.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + delegate.readBytes(b, offset, len); + } + + @Override + public void reset() throws IOException { + delegate.reset(); + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public Version getVersion() { + return delegate.getVersion(); + } + + @Override + public void setVersion(Version version) { + delegate.setVersion(version); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteable.java b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteable.java new file mode 100644 index 00000000000..9396d50a986 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteable.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +/** + * A {@link Writeable} object identified by its name. + * To be used for arbitrary serializable objects (e.g. queries); when reading them, their name tells + * which specific object needs to be created. + */ +public interface NamedWriteable extends Writeable { + + /** + * Returns the name of the writeable object + */ + String getWriteableName(); +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java new file mode 100644 index 00000000000..a6d17089652 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import java.io.IOException; + +/** + * Wraps a {@link StreamInput} and associates it with a {@link NamedWriteableRegistry} + */ +public class NamedWriteableAwareStreamInput extends FilterStreamInput { + + private final NamedWriteableRegistry namedWriteableRegistry; + + public NamedWriteableAwareStreamInput(StreamInput delegate, NamedWriteableRegistry namedWriteableRegistry) { + super(delegate); + this.namedWriteableRegistry = namedWriteableRegistry; + } + + @Override + C readNamedWriteable(Class categoryClass) throws IOException { + String name = readString(); + NamedWriteable namedWriteable = namedWriteableRegistry.getPrototype(categoryClass, name); + return namedWriteable.readFrom(this); + } +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java new file mode 100644 index 00000000000..42014786749 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import java.util.HashMap; +import java.util.Map; + +/** + * Registry for {@link NamedWriteable} objects. Allows to register and retrieve prototype instances of writeable objects + * given their name. + */ +public class NamedWriteableRegistry { + + private final Map, InnerRegistry> registry = new HashMap<>(); + + /** + * Registers a {@link NamedWriteable} prototype given its category + */ + public synchronized void registerPrototype(Class categoryClass, NamedWriteable namedWriteable) { + @SuppressWarnings("unchecked") + InnerRegistry innerRegistry = (InnerRegistry)registry.get(categoryClass); + if (innerRegistry == null) { + innerRegistry = new InnerRegistry<>(categoryClass); + registry.put(categoryClass, innerRegistry); + } + innerRegistry.registerPrototype(namedWriteable); + } + + /** + * Returns a prototype of the {@link NamedWriteable} object identified by the name provided as argument and its category + */ + public synchronized NamedWriteable getPrototype(Class categoryClass, String name) { + @SuppressWarnings("unchecked") + InnerRegistry innerRegistry = (InnerRegistry)registry.get(categoryClass); + if (innerRegistry == null) { + throw new IllegalArgumentException("unknown named writeable category [" + categoryClass.getName() + "]"); + } + return innerRegistry.getPrototype(name); + } + + private static class InnerRegistry { + + private final Map> registry = new HashMap<>(); + private final Class categoryClass; + + private InnerRegistry(Class categoryClass) { + this.categoryClass = categoryClass; + } + + private void registerPrototype(NamedWriteable namedWriteable) { + NamedWriteable existingNamedWriteable = registry.get(namedWriteable.getWriteableName()); + if (existingNamedWriteable != null) { + throw new IllegalArgumentException("named writeable of type [" + namedWriteable.getClass().getName() + "] with name [" + namedWriteable.getWriteableName() + "] " + + "is already registered by type [" + existingNamedWriteable.getClass().getName() + "] within category [" + categoryClass.getName() + "]"); + } + registry.put(namedWriteable.getWriteableName(), namedWriteable); + } + + private NamedWriteable getPrototype(String name) { + NamedWriteable namedWriteable = registry.get(name); + if (namedWriteable == null) { + throw new IllegalArgumentException("unknown named writeable with name [" + name + "] within category [" + categoryClass.getName() + "]"); + } + return namedWriteable; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 9c7041abedc..cb42a9ff1b2 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.io.stream; -import com.fasterxml.jackson.core.JsonLocation; -import com.fasterxml.jackson.core.JsonParseException; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -28,7 +26,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CharsRefBuilder; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -42,14 +39,10 @@ import org.joda.time.DateTimeZone; import java.io.*; import java.nio.file.NoSuchFileException; import java.util.*; -import java.util.regex.Pattern; import static org.elasticsearch.ElasticsearchException.readException; import static org.elasticsearch.ElasticsearchException.readStackTrace; -/** - * - */ public abstract class StreamInput extends InputStream { private Version version = Version.CURRENT; @@ -58,9 +51,8 @@ public abstract class StreamInput extends InputStream { return this.version; } - public StreamInput setVersion(Version version) { + public void setVersion(Version version) { this.version = version; - return this; } /** @@ -256,7 +248,7 @@ public abstract class StreamInput extends InputStream { final int charCount = readVInt(); spare.clear(); spare.grow(charCount); - int c = 0; + int c; while (spare.length() < charCount) { c = readByte() & 0xff; switch (c >> 4) { @@ -348,6 +340,7 @@ public abstract class StreamInput extends InputStream { } @Nullable + @SuppressWarnings("unchecked") public Map readMap() throws IOException { return (Map) readGenericValue(); } @@ -556,6 +549,16 @@ public abstract class StreamInput extends InputStream { return null; } + /** + * Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for + * the corresponding entry in the registry by name, so that the proper object can be read and returned. + * Default implementation throws {@link UnsupportedOperationException} as StreamInput doesn't hold a registry. + * Use {@link FilterInputStream} instead which wraps a stream and supports a {@link NamedWriteableRegistry} too. + */ + C readNamedWriteable(@SuppressWarnings("unused") Class categoryClass) throws IOException { + throw new UnsupportedOperationException(); + } + public static StreamInput wrap(BytesReference reference) { if (reference.hasArray() == false) { reference = reference.toBytesArray(); @@ -570,5 +573,4 @@ public abstract class StreamInput extends InputStream { public static StreamInput wrap(byte[] bytes, int offset, int length) { return new InputStreamStreamInput(new ByteArrayInputStream(bytes, offset, length)); } - } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index d00ca4446c2..afd607364c1 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.io.stream; -import com.vividsolutions.jts.util.Assert; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -371,6 +370,7 @@ public abstract class StreamOutput extends OutputStream { } else { writeByte((byte) 10); } + @SuppressWarnings("unchecked") Map map = (Map) value; writeVInt(map.size()); for (Map.Entry entry : map.entrySet()) { @@ -416,31 +416,31 @@ public abstract class StreamOutput extends OutputStream { } } - public void writeIntArray(int[] value) throws IOException { - writeVInt(value.length); - for (int i=0; i implem private final static ConcurrentMap transports = newConcurrentMap(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); + private final NamedWriteableRegistry namedWriteableRegistry; public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address"; public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers"; public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue"; @Inject - public LocalTransport(Settings settings, ThreadPool threadPool, Version version) { + public LocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { super(settings); this.threadPool = threadPool; this.version = version; @@ -81,6 +84,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory); + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -256,6 +260,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception { + stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry); final String action = stream.readString(); transportServiceAdapter.onRequestReceived(requestId, action); final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index c74f0f68d01..3bf4fa6701d 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -226,6 +227,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry); final String action = buffer.readString(); transportServiceAdapter.onRequestReceived(requestId, action); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 520f54bda39..0b85bb63211 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -24,7 +24,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.elasticsearch.*; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -32,6 +33,7 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasables; @@ -149,6 +151,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem protected volatile TransportServiceAdapter transportServiceAdapter; protected volatile BoundTransportAddress boundAddress; protected final KeyedLock connectionLock = new KeyedLock<>(); + protected final NamedWriteableRegistry namedWriteableRegistry; // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) @@ -158,7 +161,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ScheduledPing scheduledPing; @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) { super(settings); this.threadPool = threadPool; this.networkService = networkService; @@ -214,6 +217,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (pingSchedule.millis() > 0) { threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing); } + this.namedWriteableRegistry = namedWriteableRegistry; } public Settings settings() { @@ -969,7 +973,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory { - protected NettyTransport nettyTransport; + protected final NettyTransport nettyTransport; public ClientChannelPipelineFactory(NettyTransport nettyTransport) { this.nettyTransport = nettyTransport; diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index fcc755b4132..b0271e39753 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.settings.DynamicSettings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -59,10 +60,10 @@ public class BenchmarkNettyLargeMessages { final ThreadPool threadPool = new ThreadPool("BenchmarkNettyLargeMessages"); final TransportService transportServiceServer = new TransportService( - new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool + new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool ).start(); final TransportService transportServiceClient = new TransportService( - new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool + new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool ).start(); final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index ff5c9c62261..3e5b23b5bfb 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -22,6 +22,7 @@ package org.elasticsearch.benchmark.transport; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -44,13 +45,13 @@ public class TransportBenchmark { LOCAL { @Override public Transport newTransport(Settings settings, ThreadPool threadPool) { - return new LocalTransport(settings, threadPool, Version.CURRENT); + return new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()); } }, NETTY { @Override public Transport newTransport(Settings settings, ThreadPool threadPool) { - return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); } }; diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java index 3006e3f5228..5575ed93317 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffPublishingTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; @@ -168,7 +169,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase { } protected MockTransportService buildTransportService(Settings settings, Version version) { - MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version), threadPool); + MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java similarity index 67% rename from core/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java rename to core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index bc25a9f5919..d313dd71d81 100644 --- a/core/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -17,17 +17,17 @@ * under the License. */ -package org.elasticsearch.common.io.streams; +package org.elasticsearch.common.io.stream; import org.apache.lucene.util.Constants; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; -import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; +import java.util.Objects; + import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; @@ -304,6 +304,131 @@ public class BytesStreamsTests extends ESTestCase { out.close(); } + @Test + public void testNamedWriteable() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); + namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); + TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)); + out.writeNamedWriteable(namedWriteableIn); + StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry); + BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class); + assertEquals(namedWriteableOut, namedWriteableIn); + } + + @Test + public void testNamedWriteableDuplicates() throws IOException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); + namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); + try { + namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); + fail("registerPrototype should have failed"); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("named writeable of type [" + TestNamedWriteable.class.getName() + "] with name [" + TestNamedWriteable.NAME + "] is already registered by type [" + + TestNamedWriteable.class.getName() + "] within category [" + BaseNamedWriteable.class.getName() + "]")); + } + } + + @Test + public void testNamedWriteableUnknownCategory() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + out.writeNamedWriteable(new TestNamedWriteable("test1", "test2")); + StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), new NamedWriteableRegistry()); + try { + //no named writeable registered with given name, can write but cannot read it back + in.readNamedWriteable(BaseNamedWriteable.class); + fail("read should have failed"); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("unknown named writeable category [" + BaseNamedWriteable.class.getName() + "]")); + } + } + + @Test + public void testNamedWriteableUnknownNamedWriteable() throws IOException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); + namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); + BytesStreamOutput out = new BytesStreamOutput(); + out.writeNamedWriteable(new NamedWriteable() { + @Override + public String getWriteableName() { + return "unknown"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + } + + @Override + public Object readFrom(StreamInput in) throws IOException { + return null; + } + }); + StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry); + try { + //no named writeable registered with given name under test category, can write but cannot read it back + in.readNamedWriteable(BaseNamedWriteable.class); + fail("read should have failed"); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("unknown named writeable with name [unknown] within category [" + BaseNamedWriteable.class.getName() + "]")); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testNamedWriteableNotSupportedWithoutWrapping() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + TestNamedWriteable testNamedWriteable = new TestNamedWriteable("test1", "test2"); + out.writeNamedWriteable(testNamedWriteable); + StreamInput in = StreamInput.wrap(out.bytes().toBytes()); + in.readNamedWriteable(BaseNamedWriteable.class); + } + + private static abstract class BaseNamedWriteable implements NamedWriteable { + + } + + private static class TestNamedWriteable extends BaseNamedWriteable { + + private static final String NAME = "test-named-writeable"; + + private final String field1; + private final String field2; + + TestNamedWriteable(String field1, String field2) { + this.field1 = field1; + this.field2 = field2; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(field1); + out.writeString(field2); + } + + @Override + public TestNamedWriteable readFrom(StreamInput in) throws IOException { + return new TestNamedWriteable(in.readString(), in.readString()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestNamedWriteable that = (TestNamedWriteable) o; + return Objects.equals(field1, that.field1) && + Objects.equals(field2, that.field2); + } + + @Override + public int hashCode() { + return Objects.hash(field1, field2); + } + } + // we ignore this test for now since all existing callers of BytesStreamOutput happily // call bytes() after close(). @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12620") diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 1471b5b9b6f..a39b15468d9 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; @@ -105,7 +106,7 @@ public class ZenFaultDetectionTests extends ESTestCase { } protected MockTransportService build(Settings settings, Version version) { - MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool); + MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java index 73ba27ddc42..00da8b1d54b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -62,10 +63,10 @@ public class MulticastZenPingIT extends ESTestCase { ThreadPool threadPool = new ThreadPool("testSimplePings"); final ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT); MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); @@ -135,7 +136,7 @@ public class MulticastZenPingIT extends ESTestCase { final ThreadPool threadPool = new ThreadPool("testExternalPing"); final ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java index d3a0ac82d92..83d31450088 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -54,13 +55,13 @@ public class UnicastZenPingIT extends ESTestCase { NetworkService networkService = new NetworkService(settings); ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT); - NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java b/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java index b418081c898..514e8460f08 100644 --- a/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java +++ b/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.plugins; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.AssertingLocalTransport; @@ -91,8 +92,8 @@ public class PluggableTransportModuleIT extends ESIntegTestCase { public static final class CountingAssertingLocalTransport extends AssertingLocalTransport { @Inject - public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) { - super(settings, threadPool, version); + public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + super(settings, threadPool, version, namedWriteableRegistry); } @Override diff --git a/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java b/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java index 3becbd2cf49..1229f9e65ec 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java +++ b/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java @@ -22,6 +22,7 @@ package org.elasticsearch.test.transport; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.VersionUtils; @@ -45,8 +46,8 @@ public class AssertingLocalTransport extends LocalTransport { private final Version maxVersion; @Inject - public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) { - super(settings, threadPool, version); + public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + super(settings, threadPool, version, namedWriteableRegistry); final long seed = settings.getAsLong(ESIntegTestCase.SETTING_INDEX_SEED, 0l); random = new Random(seed); minVersion = settings.getAsVersion(ASSERTING_TRANSPORT_MIN_VERSION_KEY, Version.V_0_18_0); diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 71f4885e32b..1c5f34b8ce2 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import com.google.common.base.Charsets; import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -62,7 +63,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { threadPool.setNodeSettingsService(new NodeSettingsService(settings)); NetworkService networkService = new NetworkService(settings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); - nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT); + nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry()); nettyTransport.start(); TransportService transportService = new TransportService(nettyTransport, threadPool); nettyTransport.transportServiceAdapter(transportService.createAdapter()); diff --git a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 4c04c79ff73..3a75da5a89d 100644 --- a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.local; import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.AbstractSimpleTransportTests; @@ -28,7 +29,7 @@ public class SimpleLocalTransportTests extends AbstractSimpleTransportTests { @Override protected MockTransportService build(Settings settings, Version version) { - MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool); + MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index e3f515b3b4e..d5371999c1e 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableMap; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -48,11 +49,11 @@ public class NettyScheduledPingTests extends ESTestCase { int endPort = startPort + 10; Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build(); - final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); serviceA.start(); - final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); serviceB.start(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java index 97515ce57e5..f53f3e07f5f 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Client; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.network.NetworkService; @@ -34,7 +35,10 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.RequestHandlerRegistry; +import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TransportRequest; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -83,8 +87,8 @@ public class NettyTransportIT extends ESIntegTestCase { public static final class ExceptionThrowingNettyTransport extends NettyTransport { @Inject - public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { - super(settings, threadPool, networkService, bigArrays, version); + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) { + super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry); } @Override diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java index 55925dbd76b..704dbe9e85e 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java @@ -20,10 +20,10 @@ package org.elasticsearch.transport.netty; import com.carrotsearch.hppc.IntHashSet; import com.google.common.base.Charsets; - import org.elasticsearch.Version; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -213,7 +213,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) { BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); - NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT); + NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, new NamedWriteableRegistry()); nettyTransport.start(); assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index cca84a14097..0b0b1f8ac64 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -37,7 +38,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { int startPort = 11000 + randomIntBetween(0, 255); int endPort = startPort + 10; settings = Settings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build(); - MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version), threadPool); + MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry()), threadPool); transportService.start(); return transportService; } From 63d18d5e05b1313348d2925d498f41112b6db7df Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 6 Aug 2015 12:55:57 +0200 Subject: [PATCH 2/3] Make FilterInputStream abstract --- .../org/elasticsearch/common/io/stream/FilterStreamInput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 8904e80b646..0dac786778f 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -26,7 +26,7 @@ import java.io.IOException; /** * Wraps a {@link StreamInput} and delegates to it. To be used to add functionality to an existing stream by subclassing. */ -public class FilterStreamInput extends StreamInput { +public abstract class FilterStreamInput extends StreamInput { private final StreamInput delegate; From 6f13171d50612f3004cc0eee5ddb5beead06b104 Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 6 Aug 2015 12:56:28 +0200 Subject: [PATCH 3/3] [TEST] add NamedWriteableRegistry argument to AbstractSimpleTransportTests#build method --- .../transport/AbstractSimpleTransportTests.java | 7 ++++--- .../transport/local/SimpleLocalTransportTests.java | 4 ++-- .../transport/netty/SimpleNettyTransportTests.java | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 48b9e9fcc0f..9041bca2797 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport; import com.google.common.collect.ImmutableMap; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -59,7 +60,7 @@ public abstract class AbstractSimpleTransportTests extends ESTestCase { protected DiscoveryNode nodeB; protected MockTransportService serviceB; - protected abstract MockTransportService build(Settings settings, Version version); + protected abstract MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry); @Override @Before @@ -68,12 +69,12 @@ public abstract class AbstractSimpleTransportTests extends ESTestCase { threadPool = new ThreadPool(getClass().getName()); serviceA = build( Settings.builder().put("name", "TS_A", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(), - version0 + version0, new NamedWriteableRegistry() ); nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.of(), version0); serviceB = build( Settings.builder().put("name", "TS_B", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(), - version1 + version1, new NamedWriteableRegistry() ); nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.of(), version1); diff --git a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 3a75da5a89d..e87b0786a5b 100644 --- a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -28,8 +28,8 @@ import org.elasticsearch.transport.AbstractSimpleTransportTests; public class SimpleLocalTransportTests extends AbstractSimpleTransportTests { @Override - protected MockTransportService build(Settings settings, Version version) { - MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool); + protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { + MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 0b0b1f8ac64..a0b6ddbdbb5 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -34,11 +34,11 @@ import org.junit.Test; public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { @Override - protected MockTransportService build(Settings settings, Version version) { + protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { int startPort = 11000 + randomIntBetween(0, 255); int endPort = startPort + 10; settings = Settings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build(); - MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry()), threadPool); + MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry), threadPool); transportService.start(); return transportService; }