Transport: allow to de-serialize arbitrary objects given their name

This commit makes it possible to serialize arbitrary objects by having them extend Writeable. When reading them though, we need to be able to identify which object we have to create, based on its name. This is useful for queries once we move to parsing on the coordinating node, as well as with aggregations and so on.

Introduced a new abstraction called NamedWriteable, which is supported by StreamOutput and StreamInput through writeNamedWriteable and readNamedWriteable methods. A new NamedWriteableRegistry is introduced also where named writeable prototypes need to be registered so that we are able to retrieve the proper instance of the writeable given its name and then de-serialize it calling readFrom against it.

Closes #12393
This commit is contained in:
javanna 2015-07-22 12:11:10 +02:00 committed by Luca Cavanna
parent d0abffc9ac
commit e1e9e1a6e1
27 changed files with 460 additions and 66 deletions

View File

@ -44,9 +44,9 @@ public abstract class CompressedStreamInput extends StreamInput {
}
@Override
public StreamInput setVersion(Version version) {
public void setVersion(Version version) {
in.setVersion(version);
return super.setVersion(version);
super.setVersion(version);
}
/**

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import java.io.IOException;
/**
* Wraps a {@link StreamInput} and delegates to it. To be used to add functionality to an existing stream by subclassing.
*/
public class FilterStreamInput extends StreamInput {
private final StreamInput delegate;
protected FilterStreamInput(StreamInput delegate) {
this.delegate = delegate;
}
@Override
public byte readByte() throws IOException {
return delegate.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
delegate.readBytes(b, offset, len);
}
@Override
public void reset() throws IOException {
delegate.reset();
}
@Override
public int read() throws IOException {
return delegate.read();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public Version getVersion() {
return delegate.getVersion();
}
@Override
public void setVersion(Version version) {
delegate.setVersion(version);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
/**
* A {@link Writeable} object identified by its name.
* To be used for arbitrary serializable objects (e.g. queries); when reading them, their name tells
* which specific object needs to be created.
*/
public interface NamedWriteable<T> extends Writeable<T> {
/**
* Returns the name of the writeable object
*/
String getWriteableName();
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
* Wraps a {@link StreamInput} and associates it with a {@link NamedWriteableRegistry}
*/
public class NamedWriteableAwareStreamInput extends FilterStreamInput {
private final NamedWriteableRegistry namedWriteableRegistry;
public NamedWriteableAwareStreamInput(StreamInput delegate, NamedWriteableRegistry namedWriteableRegistry) {
super(delegate);
this.namedWriteableRegistry = namedWriteableRegistry;
}
@Override
<C> C readNamedWriteable(Class<C> categoryClass) throws IOException {
String name = readString();
NamedWriteable<? extends C> namedWriteable = namedWriteableRegistry.getPrototype(categoryClass, name);
return namedWriteable.readFrom(this);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.util.HashMap;
import java.util.Map;
/**
* Registry for {@link NamedWriteable} objects. Allows to register and retrieve prototype instances of writeable objects
* given their name.
*/
public class NamedWriteableRegistry {
private final Map<Class<?>, InnerRegistry<?>> registry = new HashMap<>();
/**
* Registers a {@link NamedWriteable} prototype given its category
*/
public synchronized <T> void registerPrototype(Class<T> categoryClass, NamedWriteable<? extends T> namedWriteable) {
@SuppressWarnings("unchecked")
InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
if (innerRegistry == null) {
innerRegistry = new InnerRegistry<>(categoryClass);
registry.put(categoryClass, innerRegistry);
}
innerRegistry.registerPrototype(namedWriteable);
}
/**
* Returns a prototype of the {@link NamedWriteable} object identified by the name provided as argument and its category
*/
public synchronized <T> NamedWriteable<? extends T> getPrototype(Class<T> categoryClass, String name) {
@SuppressWarnings("unchecked")
InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
if (innerRegistry == null) {
throw new IllegalArgumentException("unknown named writeable category [" + categoryClass.getName() + "]");
}
return innerRegistry.getPrototype(name);
}
private static class InnerRegistry<T> {
private final Map<String, NamedWriteable<? extends T>> registry = new HashMap<>();
private final Class<T> categoryClass;
private InnerRegistry(Class<T> categoryClass) {
this.categoryClass = categoryClass;
}
private void registerPrototype(NamedWriteable<? extends T> namedWriteable) {
NamedWriteable<? extends T> existingNamedWriteable = registry.get(namedWriteable.getWriteableName());
if (existingNamedWriteable != null) {
throw new IllegalArgumentException("named writeable of type [" + namedWriteable.getClass().getName() + "] with name [" + namedWriteable.getWriteableName() + "] " +
"is already registered by type [" + existingNamedWriteable.getClass().getName() + "] within category [" + categoryClass.getName() + "]");
}
registry.put(namedWriteable.getWriteableName(), namedWriteable);
}
private NamedWriteable<? extends T> getPrototype(String name) {
NamedWriteable<? extends T> namedWriteable = registry.get(name);
if (namedWriteable == null) {
throw new IllegalArgumentException("unknown named writeable with name [" + name + "] within category [" + categoryClass.getName() + "]");
}
return namedWriteable;
}
}
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.common.io.stream;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParseException;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
@ -28,7 +26,6 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -42,14 +39,10 @@ import org.joda.time.DateTimeZone;
import java.io.*;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.regex.Pattern;
import static org.elasticsearch.ElasticsearchException.readException;
import static org.elasticsearch.ElasticsearchException.readStackTrace;
/**
*
*/
public abstract class StreamInput extends InputStream {
private Version version = Version.CURRENT;
@ -58,9 +51,8 @@ public abstract class StreamInput extends InputStream {
return this.version;
}
public StreamInput setVersion(Version version) {
public void setVersion(Version version) {
this.version = version;
return this;
}
/**
@ -256,7 +248,7 @@ public abstract class StreamInput extends InputStream {
final int charCount = readVInt();
spare.clear();
spare.grow(charCount);
int c = 0;
int c;
while (spare.length() < charCount) {
c = readByte() & 0xff;
switch (c >> 4) {
@ -348,6 +340,7 @@ public abstract class StreamInput extends InputStream {
}
@Nullable
@SuppressWarnings("unchecked")
public Map<String, Object> readMap() throws IOException {
return (Map<String, Object>) readGenericValue();
}
@ -556,6 +549,16 @@ public abstract class StreamInput extends InputStream {
return null;
}
/**
* Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for
* the corresponding entry in the registry by name, so that the proper object can be read and returned.
* Default implementation throws {@link UnsupportedOperationException} as StreamInput doesn't hold a registry.
* Use {@link FilterInputStream} instead which wraps a stream and supports a {@link NamedWriteableRegistry} too.
*/
<C> C readNamedWriteable(@SuppressWarnings("unused") Class<C> categoryClass) throws IOException {
throw new UnsupportedOperationException();
}
public static StreamInput wrap(BytesReference reference) {
if (reference.hasArray() == false) {
reference = reference.toBytesArray();
@ -570,5 +573,4 @@ public abstract class StreamInput extends InputStream {
public static StreamInput wrap(byte[] bytes, int offset, int length) {
return new InputStreamStreamInput(new ByteArrayInputStream(bytes, offset, length));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.io.stream;
import com.vividsolutions.jts.util.Assert;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
@ -371,6 +370,7 @@ public abstract class StreamOutput extends OutputStream {
} else {
writeByte((byte) 10);
}
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
writeVInt(map.size());
for (Map.Entry<String, Object> entry : map.entrySet()) {
@ -416,31 +416,31 @@ public abstract class StreamOutput extends OutputStream {
}
}
public void writeIntArray(int[] value) throws IOException {
writeVInt(value.length);
for (int i=0; i<value.length; i++) {
writeInt(value[i]);
public void writeIntArray(int[] values) throws IOException {
writeVInt(values.length);
for (int value : values) {
writeInt(value);
}
}
public void writeLongArray(long[] value) throws IOException {
writeVInt(value.length);
for (int i=0; i<value.length; i++) {
writeLong(value[i]);
public void writeLongArray(long[] values) throws IOException {
writeVInt(values.length);
for (long value : values) {
writeLong(value);
}
}
public void writeFloatArray(float[] value) throws IOException {
writeVInt(value.length);
for (int i=0; i<value.length; i++) {
writeFloat(value[i]);
public void writeFloatArray(float[] values) throws IOException {
writeVInt(values.length);
for (float value : values) {
writeFloat(value);
}
}
public void writeDoubleArray(double[] value) throws IOException {
writeVInt(value.length);
for (int i=0; i<value.length; i++) {
writeDouble(value[i]);
public void writeDoubleArray(double[] values) throws IOException {
writeVInt(values.length);
for (double value : values) {
writeDouble(value);
}
}
@ -613,4 +613,12 @@ public abstract class StreamOutput extends OutputStream {
ElasticsearchException.writeStackTraces(throwable, this);
}
}
/**
* Writes a {@link NamedWriteable} to the current stream, by first writing its name and then the object itself
*/
void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
writeString(namedWriteable.getWriteableName());
namedWriteable.writeTo(this);
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
@ -85,8 +84,8 @@ import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportModule;
@ -254,7 +253,7 @@ public class Node implements Releasable {
injector.getInstance(MonitorService.class).start();
injector.getInstance(RestController.class).start();
// TODO hack around circular dependecncies problems
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport;
import com.google.common.base.Preconditions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -64,6 +65,8 @@ public class TransportModule extends AbstractModule {
}
}
bind(NamedWriteableRegistry.class).asEagerSingleton();
if (configuredTransport != null) {
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
bind(Transport.class).to(configuredTransport).asEagerSingleton();

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -65,13 +67,14 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private final static ConcurrentMap<TransportAddress, LocalTransport> transports = newConcurrentMap();
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
private final NamedWriteableRegistry namedWriteableRegistry;
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";
public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
@Inject
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
public LocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
super(settings);
this.threadPool = threadPool;
this.version = version;
@ -81,6 +84,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory);
this.namedWriteableRegistry = namedWriteableRegistry;
}
@Override
@ -256,6 +260,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception {
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
final String action = stream.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -226,6 +227,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
final String action = buffer.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);

View File

@ -24,7 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.*;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
@ -32,6 +33,7 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
@ -149,6 +151,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
protected volatile TransportServiceAdapter transportServiceAdapter;
protected volatile BoundTransportAddress boundAddress;
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
protected final NamedWriteableRegistry namedWriteableRegistry;
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
@ -158,7 +161,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ScheduledPing scheduledPing;
@Inject
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
super(settings);
this.threadPool = threadPool;
this.networkService = networkService;
@ -214,6 +217,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing);
}
this.namedWriteableRegistry = namedWriteableRegistry;
}
public Settings settings() {
@ -969,7 +973,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory {
protected NettyTransport nettyTransport;
protected final NettyTransport nettyTransport;
public ClientChannelPipelineFactory(NettyTransport nettyTransport) {
this.nettyTransport = nettyTransport;

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -59,10 +60,10 @@ public class BenchmarkNettyLargeMessages {
final ThreadPool threadPool = new ThreadPool("BenchmarkNettyLargeMessages");
final TransportService transportServiceServer = new TransportService(
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool
).start();
final TransportService transportServiceClient = new TransportService(
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool
).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.benchmark.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -44,13 +45,13 @@ public class TransportBenchmark {
LOCAL {
@Override
public Transport newTransport(Settings settings, ThreadPool threadPool) {
return new LocalTransport(settings, threadPool, Version.CURRENT);
return new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry());
}
},
NETTY {
@Override
public Transport newTransport(Settings settings, ThreadPool threadPool) {
return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
}
};

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
@ -168,7 +169,7 @@ public class ClusterStateDiffPublishingTests extends ESTestCase {
}
protected MockTransportService buildTransportService(Settings settings, Version version) {
MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version), threadPool);
MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
transportService.start();
return transportService;
}

View File

@ -17,17 +17,17 @@
* under the License.
*/
package org.elasticsearch.common.io.streams;
package org.elasticsearch.common.io.stream;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.util.Objects;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
@ -304,6 +304,131 @@ public class BytesStreamsTests extends ESTestCase {
out.close();
}
@Test
public void testNamedWriteable() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
out.writeNamedWriteable(namedWriteableIn);
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry);
BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class);
assertEquals(namedWriteableOut, namedWriteableIn);
}
@Test
public void testNamedWriteableDuplicates() throws IOException {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
try {
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
fail("registerPrototype should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("named writeable of type [" + TestNamedWriteable.class.getName() + "] with name [" + TestNamedWriteable.NAME + "] is already registered by type ["
+ TestNamedWriteable.class.getName() + "] within category [" + BaseNamedWriteable.class.getName() + "]"));
}
}
@Test
public void testNamedWriteableUnknownCategory() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.writeNamedWriteable(new TestNamedWriteable("test1", "test2"));
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), new NamedWriteableRegistry());
try {
//no named writeable registered with given name, can write but cannot read it back
in.readNamedWriteable(BaseNamedWriteable.class);
fail("read should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unknown named writeable category [" + BaseNamedWriteable.class.getName() + "]"));
}
}
@Test
public void testNamedWriteableUnknownNamedWriteable() throws IOException {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
BytesStreamOutput out = new BytesStreamOutput();
out.writeNamedWriteable(new NamedWriteable() {
@Override
public String getWriteableName() {
return "unknown";
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public Object readFrom(StreamInput in) throws IOException {
return null;
}
});
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry);
try {
//no named writeable registered with given name under test category, can write but cannot read it back
in.readNamedWriteable(BaseNamedWriteable.class);
fail("read should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unknown named writeable with name [unknown] within category [" + BaseNamedWriteable.class.getName() + "]"));
}
}
@Test(expected = UnsupportedOperationException.class)
public void testNamedWriteableNotSupportedWithoutWrapping() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
TestNamedWriteable testNamedWriteable = new TestNamedWriteable("test1", "test2");
out.writeNamedWriteable(testNamedWriteable);
StreamInput in = StreamInput.wrap(out.bytes().toBytes());
in.readNamedWriteable(BaseNamedWriteable.class);
}
private static abstract class BaseNamedWriteable<T> implements NamedWriteable<T> {
}
private static class TestNamedWriteable extends BaseNamedWriteable<TestNamedWriteable> {
private static final String NAME = "test-named-writeable";
private final String field1;
private final String field2;
TestNamedWriteable(String field1, String field2) {
this.field1 = field1;
this.field2 = field2;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(field1);
out.writeString(field2);
}
@Override
public TestNamedWriteable readFrom(StreamInput in) throws IOException {
return new TestNamedWriteable(in.readString(), in.readString());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestNamedWriteable that = (TestNamedWriteable) o;
return Objects.equals(field1, that.field1) &&
Objects.equals(field2, that.field2);
}
@Override
public int hashCode() {
return Objects.hash(field1, field2);
}
}
// we ignore this test for now since all existing callers of BytesStreamOutput happily
// call bytes() after close().
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12620")

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
@ -105,7 +106,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
}
protected MockTransportService build(Settings settings, Version version) {
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
transportService.start();
return transportService;
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -62,10 +63,10 @@ public class MulticastZenPingIT extends ESTestCase {
ThreadPool threadPool = new ThreadPool("testSimplePings");
final ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
@ -135,7 +136,7 @@ public class MulticastZenPingIT extends ESTestCase {
final ThreadPool threadPool = new ThreadPool("testExternalPing");
final ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -54,13 +55,13 @@ public class UnicastZenPingIT extends ESTestCase {
NetworkService networkService = new NetworkService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.plugins;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.AssertingLocalTransport;
@ -91,8 +92,8 @@ public class PluggableTransportModuleIT extends ESIntegTestCase {
public static final class CountingAssertingLocalTransport extends AssertingLocalTransport {
@Inject
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings, threadPool, version);
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, threadPool, version, namedWriteableRegistry);
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.test.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.VersionUtils;
@ -45,8 +46,8 @@ public class AssertingLocalTransport extends LocalTransport {
private final Version maxVersion;
@Inject
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings, threadPool, version);
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, threadPool, version, namedWriteableRegistry);
final long seed = settings.getAsLong(ESIntegTestCase.SETTING_INDEX_SEED, 0l);
random = new Random(seed);
minVersion = settings.getAsVersion(ASSERTING_TRANSPORT_MIN_VERSION_KEY, Version.V_0_18_0);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.google.common.base.Charsets;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -62,7 +63,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
threadPool.setNodeSettingsService(new NodeSettingsService(settings));
NetworkService networkService = new NetworkService(settings);
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT);
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry());
nettyTransport.start();
TransportService transportService = new TransportService(nettyTransport, threadPool);
nettyTransport.transportServiceAdapter(transportService.createAdapter());

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport.local;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.AbstractSimpleTransportTests;
@ -28,7 +29,7 @@ public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
@Override
protected MockTransportService build(Settings settings, Version version) {
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
transportService.start();
return transportService;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -48,11 +49,11 @@ public class NettyScheduledPingTests extends ESTestCase {
int endPort = startPort + 10;
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build();
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
serviceA.start();
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
serviceB.start();

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.network.NetworkService;
@ -34,7 +35,10 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequest;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -83,8 +87,8 @@ public class NettyTransportIT extends ESIntegTestCase {
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
@Inject
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings, threadPool, networkService, bigArrays, version);
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry);
}
@Override

View File

@ -20,10 +20,10 @@ package org.elasticsearch.transport.netty;
import com.carrotsearch.hppc.IntHashSet;
import com.google.common.base.Charsets;
import org.elasticsearch.Version;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
@ -213,7 +213,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT);
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, new NamedWriteableRegistry());
nettyTransport.start();
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -37,7 +38,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
int startPort = 11000 + randomIntBetween(0, 255);
int endPort = startPort + 10;
settings = Settings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build();
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version), threadPool);
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry()), threadPool);
transportService.start();
return transportService;
}