mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Transport: allow to de-serialize arbitrary objects given their name
As part of the query refactoring, we want to be able to serialize queries by having them extend Writeable, rather than serializing their json. When reading them though, we need to be able to identify which query we have to create, based on its name. For this purpose we introduce a new abstraction called NamedWriteable, which is supported by StreamOutput and StreamInput through writeNamedWriteable and readNamedWriteable methods. A new NamedWriteableRegistry is introduced also where named writeable prototypes need to be registered so that we are able to retrieve the proper instance of query given its name and then de-serialize it calling readFrom against it. Closes #11553
This commit is contained in:
parent
5f66f68135
commit
ff9041dc48
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Wraps a {@link StreamInput} and associates it with a {@link NamedWriteableRegistry}
|
||||
*/
|
||||
public class FilterStreamInput extends StreamInput {
|
||||
|
||||
private final StreamInput delegate;
|
||||
|
||||
public FilterStreamInput(StreamInput delegate, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(namedWriteableRegistry);
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
return delegate.readByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
delegate.readBytes(b, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
delegate.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return delegate.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
/**
|
||||
* A {@link Writeable} object identified by its name.
|
||||
* To be used for arbitrary serializable objects (e.g. queries); when reading them, their name tells
|
||||
* which specific object needs to be created.
|
||||
*/
|
||||
public interface NamedWriteable<T> extends Writeable<T> {
|
||||
|
||||
/**
|
||||
* Returns the name of the writeable object
|
||||
*/
|
||||
String getName();
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Registry for {@link NamedWriteable} objects. Allows to register and retrieve prototype instances of writeable objects
|
||||
* given their name.
|
||||
*/
|
||||
public class NamedWriteableRegistry {
|
||||
|
||||
private Map<String, NamedWriteable> registry = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Registers a {@link NamedWriteable} prototype
|
||||
*/
|
||||
public synchronized void registerPrototype(NamedWriteable<?> namedWriteable) {
|
||||
if (registry.containsKey(namedWriteable.getName())) {
|
||||
throw new IllegalArgumentException("named writeable of type [" + namedWriteable.getClass().getName() + "] with name [" + namedWriteable.getName() + "] " +
|
||||
"is already registered by type [" + registry.get(namedWriteable.getName()).getClass().getName() + "]");
|
||||
}
|
||||
registry.put(namedWriteable.getName(), namedWriteable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a prototype of the {@link NamedWriteable} object identified by the name provided as argument
|
||||
*/
|
||||
public <C> NamedWriteable<C> getPrototype(String name) {
|
||||
@SuppressWarnings("unchecked")
|
||||
NamedWriteable<C> namedWriteable = (NamedWriteable<C>)registry.get(name);
|
||||
if (namedWriteable == null) {
|
||||
throw new IllegalArgumentException("unknown named writeable with name [" + name + "]");
|
||||
}
|
||||
return namedWriteable;
|
||||
}
|
||||
}
|
@ -41,8 +41,18 @@ import java.util.*;
|
||||
*/
|
||||
public abstract class StreamInput extends InputStream {
|
||||
|
||||
private final NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
private Version version = Version.CURRENT;
|
||||
|
||||
protected StreamInput() {
|
||||
this.namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
}
|
||||
|
||||
protected StreamInput(NamedWriteableRegistry namedWriteableRegistry) {
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
public Version getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
@ -245,7 +255,7 @@ public abstract class StreamInput extends InputStream {
|
||||
final int charCount = readVInt();
|
||||
spare.clear();
|
||||
spare.grow(charCount);
|
||||
int c = 0;
|
||||
int c;
|
||||
while (spare.length() < charCount) {
|
||||
c = readByte() & 0xff;
|
||||
switch (c >> 4) {
|
||||
@ -337,6 +347,7 @@ public abstract class StreamInput extends InputStream {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<String, Object> readMap() throws IOException {
|
||||
return (Map<String, Object>) readGenericValue();
|
||||
}
|
||||
@ -480,12 +491,24 @@ public abstract class StreamInput extends InputStream {
|
||||
public <T extends Throwable> T readThrowable() throws IOException {
|
||||
try {
|
||||
ObjectInputStream oin = new ObjectInputStream(this);
|
||||
return (T) oin.readObject();
|
||||
@SuppressWarnings("unchecked")
|
||||
T object = (T) oin.readObject();
|
||||
return object;
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("failed to deserialize exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for
|
||||
* the corresponding entry in the registry by name, so that the proper object can be read and returned.
|
||||
*/
|
||||
public <C> C readNamedWriteable() throws IOException {
|
||||
String name = readString();
|
||||
NamedWriteable<C> namedWriteable = namedWriteableRegistry.getPrototype(name);
|
||||
return namedWriteable.readFrom(this);
|
||||
}
|
||||
|
||||
public static StreamInput wrap(BytesReference reference) {
|
||||
if (reference.hasArray() == false) {
|
||||
reference = reference.toBytesArray();
|
||||
|
@ -359,6 +359,7 @@ public abstract class StreamOutput extends OutputStream {
|
||||
} else {
|
||||
writeByte((byte) 10);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
writeVInt(map.size());
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
@ -403,31 +404,31 @@ public abstract class StreamOutput extends OutputStream {
|
||||
}
|
||||
}
|
||||
|
||||
public void writeIntArray(int[] value) throws IOException {
|
||||
writeVInt(value.length);
|
||||
for (int i=0; i<value.length; i++) {
|
||||
writeInt(value[i]);
|
||||
public void writeIntArray(int[] values) throws IOException {
|
||||
writeVInt(values.length);
|
||||
for (int value : values) {
|
||||
writeInt(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeLongArray(long[] value) throws IOException {
|
||||
writeVInt(value.length);
|
||||
for (int i=0; i<value.length; i++) {
|
||||
writeLong(value[i]);
|
||||
public void writeLongArray(long[] values) throws IOException {
|
||||
writeVInt(values.length);
|
||||
for (long value : values) {
|
||||
writeLong(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeFloatArray(float[] value) throws IOException {
|
||||
writeVInt(value.length);
|
||||
for (int i=0; i<value.length; i++) {
|
||||
writeFloat(value[i]);
|
||||
public void writeFloatArray(float[] values) throws IOException {
|
||||
writeVInt(values.length);
|
||||
for (float value : values) {
|
||||
writeFloat(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeDoubleArray(double[] value) throws IOException {
|
||||
writeVInt(value.length);
|
||||
for (int i=0; i<value.length; i++) {
|
||||
writeDouble(value[i]);
|
||||
public void writeDoubleArray(double[] values) throws IOException {
|
||||
writeVInt(values.length);
|
||||
for (double value : values) {
|
||||
writeDouble(value);
|
||||
}
|
||||
}
|
||||
|
||||
@ -448,4 +449,12 @@ public abstract class StreamOutput extends OutputStream {
|
||||
out.writeObject(throwable);
|
||||
out.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link NamedWriteable} to the current stream, by first writing its name and then the object itself
|
||||
*/
|
||||
public void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
|
||||
writeString(namedWriteable.getName());
|
||||
namedWriteable.writeTo(this);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.query;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A filter that simply wraps a query. Same as the {@link QueryFilterBuilder} except that it allows also to
|
||||
* associate a name with the query filter.
|
||||
* @deprecated Useless now that queries and filters are merged: pass the
|
||||
* query as a filter directly.
|
||||
*/
|
||||
@Deprecated
|
||||
public class FQueryFilterBuilder extends QueryFilterBuilder {
|
||||
|
||||
public static final String NAME = "fquery";
|
||||
|
||||
static final FQueryFilterBuilder PROTOTYPE = new FQueryFilterBuilder(null);
|
||||
|
||||
/**
|
||||
* A filter that simply wraps a query.
|
||||
*
|
||||
* @param queryBuilder The query to wrap as a filter
|
||||
*/
|
||||
public FQueryFilterBuilder(QueryBuilder queryBuilder) {
|
||||
super(queryBuilder);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
buildFQuery(builder, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String queryId() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
@ -39,7 +39,7 @@ public class FQueryFilterParser extends BaseQueryParserTemp {
|
||||
|
||||
@Override
|
||||
public String[] names() {
|
||||
return new String[]{QueryFilterBuilder.FQUERY_NAME};
|
||||
return new String[]{FQueryFilterBuilder.NAME};
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -86,7 +86,7 @@ public class FQueryFilterParser extends BaseQueryParserTemp {
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryFilterBuilder getBuilderPrototype() {
|
||||
return QueryFilterBuilder.PROTOTYPE;
|
||||
public FQueryFilterBuilder getBuilderPrototype() {
|
||||
return FQueryFilterBuilder.PROTOTYPE;
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,12 @@
|
||||
package org.elasticsearch.index.query;
|
||||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.lucene.BytesRefs;
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.BytesRefs;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
@ -35,7 +35,7 @@ import java.io.IOException;
|
||||
* Base class for all classes producing lucene queries.
|
||||
* Supports conversion to BytesReference and creation of lucene Query objects.
|
||||
*/
|
||||
public abstract class QueryBuilder<QB extends QueryBuilder> extends ToXContentToBytes implements Writeable<QB> {
|
||||
public abstract class QueryBuilder<QB extends QueryBuilder> extends ToXContentToBytes implements NamedWriteable<QB> {
|
||||
|
||||
protected QueryBuilder() {
|
||||
super(XContentType.JSON);
|
||||
@ -54,6 +54,11 @@ public abstract class QueryBuilder<QB extends QueryBuilder> extends ToXContentTo
|
||||
*/
|
||||
public abstract String queryId();
|
||||
|
||||
@Override
|
||||
public final String getName() {
|
||||
return queryId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts this QueryBuilder to a lucene {@link Query}
|
||||
* @param parseContext additional information needed to construct the queries
|
||||
|
@ -33,9 +33,6 @@ public class QueryFilterBuilder extends QueryBuilder {
|
||||
|
||||
public static final String NAME = "query";
|
||||
|
||||
// this query builder creates query parsed by FQueryFilterParser in case queryName is set
|
||||
public static final String FQUERY_NAME = "fquery";
|
||||
|
||||
private final QueryBuilder queryBuilder;
|
||||
|
||||
private String queryName;
|
||||
@ -65,16 +62,21 @@ public class QueryFilterBuilder extends QueryBuilder {
|
||||
builder.field(NAME);
|
||||
queryBuilder.toXContent(builder, params);
|
||||
} else {
|
||||
builder.startObject(FQUERY_NAME);
|
||||
builder.field("query");
|
||||
queryBuilder.toXContent(builder, params);
|
||||
if (queryName != null) {
|
||||
builder.field("_name", queryName);
|
||||
}
|
||||
builder.endObject();
|
||||
//fallback fo fquery when needed, for bw comp
|
||||
buildFQuery(builder, params);
|
||||
}
|
||||
}
|
||||
|
||||
protected void buildFQuery(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(FQueryFilterBuilder.NAME);
|
||||
builder.field("query");
|
||||
queryBuilder.toXContent(builder, params);
|
||||
if (queryName != null) {
|
||||
builder.field("_name", queryName);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String queryId() {
|
||||
return NAME;
|
||||
|
@ -32,6 +32,9 @@ public class IndicesQueriesModule extends AbstractModule {
|
||||
|
||||
private Set<Class<? extends QueryParser>> queryParsersClasses = Sets.newHashSet();
|
||||
|
||||
/**
|
||||
* Registers a {@link QueryParser} given its class
|
||||
*/
|
||||
public synchronized IndicesQueriesModule addQuery(Class<? extends QueryParser> queryParser) {
|
||||
queryParsersClasses.add(queryParser);
|
||||
return this;
|
||||
@ -83,7 +86,6 @@ public class IndicesQueriesModule extends AbstractModule {
|
||||
qpBinders.addBinding().to(TemplateQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(TypeQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(LimitQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(TermsQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(ScriptQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(GeoDistanceQueryParser.class).asEagerSingleton();
|
||||
qpBinders.addBinding().to(GeoDistanceRangeQueryParser.class).asEagerSingleton();
|
||||
|
@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryParser;
|
||||
|
||||
@ -35,13 +36,14 @@ public class IndicesQueriesRegistry extends AbstractComponent {
|
||||
private ImmutableMap<String, QueryParser> queryParsers;
|
||||
|
||||
@Inject
|
||||
public IndicesQueriesRegistry(Settings settings, Set<QueryParser> injectedQueryParsers) {
|
||||
public IndicesQueriesRegistry(Settings settings, Set<QueryParser> injectedQueryParsers, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings);
|
||||
Map<String, QueryParser> queryParsers = Maps.newHashMap();
|
||||
for (QueryParser queryParser : injectedQueryParsers) {
|
||||
for (String name : queryParser.names()) {
|
||||
queryParsers.put(name, queryParser);
|
||||
}
|
||||
namedWriteableRegistry.registerPrototype(queryParser.getBuilderPrototype());
|
||||
}
|
||||
this.queryParsers = ImmutableMap.copyOf(queryParsers);
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.transport;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -64,6 +65,8 @@ public class TransportModule extends AbstractModule {
|
||||
}
|
||||
}
|
||||
|
||||
bind(NamedWriteableRegistry.class).asEagerSingleton();
|
||||
|
||||
if (configuredTransport != null) {
|
||||
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
|
||||
bind(Transport.class).to(configuredTransport).asEagerSingleton();
|
||||
|
@ -27,8 +27,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.ThrowableObjectInputStream;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.*;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
@ -66,13 +65,14 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
private final static ConcurrentMap<TransportAddress, LocalTransport> transports = newConcurrentMap();
|
||||
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
|
||||
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
|
||||
private final NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
|
||||
public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";
|
||||
public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
|
||||
|
||||
@Inject
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.version = version;
|
||||
@ -82,6 +82,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
|
||||
this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory);
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -224,7 +225,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
Transports.assertTransportThread();
|
||||
try {
|
||||
transportServiceAdapter.received(data.length);
|
||||
StreamInput stream = StreamInput.wrap(data);
|
||||
StreamInput stream = new FilterStreamInput(StreamInput.wrap(data), namedWriteableRegistry);
|
||||
stream.setVersion(version);
|
||||
|
||||
long requestId = stream.readLong();
|
||||
|
@ -25,6 +25,8 @@ import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.compress.NotCompressedException;
|
||||
import org.elasticsearch.common.io.ThrowableObjectInputStream;
|
||||
import org.elasticsearch.common.io.stream.FilterStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -49,13 +51,19 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
private final NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
|
||||
this(transport, logger, profileName, new NamedWriteableRegistry());
|
||||
}
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
this.threadPool = transport.threadPool();
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.logger = logger;
|
||||
this.profileName = profileName;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -109,6 +117,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
} else {
|
||||
wrappedStream = streamIn;
|
||||
}
|
||||
wrappedStream = new FilterStreamInput(wrappedStream, namedWriteableRegistry);
|
||||
wrappedStream.setVersion(version);
|
||||
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
@ -156,8 +157,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
// package visibility for tests
|
||||
final ScheduledPing scheduledPing;
|
||||
|
||||
protected final NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.networkService = networkService;
|
||||
@ -213,6 +216,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
if (pingSchedule.millis() > 0) {
|
||||
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing);
|
||||
}
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
public Settings settings() {
|
||||
@ -996,7 +1000,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
}
|
||||
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings settings) {
|
||||
return new ServerChannelPipelineFactory(this, name, settings);
|
||||
return new ServerChannelPipelineFactory(this, name, settings, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
protected static class ServerChannelPipelineFactory implements ChannelPipelineFactory {
|
||||
@ -1004,11 +1008,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
protected final NettyTransport nettyTransport;
|
||||
protected final String name;
|
||||
protected final Settings settings;
|
||||
protected final NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
public ServerChannelPipelineFactory(NettyTransport nettyTransport, String name, Settings settings) {
|
||||
public ServerChannelPipelineFactory(NettyTransport nettyTransport, String name, Settings settings, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
this.nettyTransport = nettyTransport;
|
||||
this.name = name;
|
||||
this.settings = settings;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1027,7 +1033,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
|
||||
}
|
||||
channelPipeline.addLast("size", sizeHeader);
|
||||
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name));
|
||||
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name, namedWriteableRegistry));
|
||||
return channelPipeline;
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.settings.DynamicSettings;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -59,10 +60,10 @@ public class BenchmarkNettyLargeMessages {
|
||||
|
||||
final ThreadPool threadPool = new ThreadPool("BenchmarkNettyLargeMessages");
|
||||
final TransportService transportServiceServer = new TransportService(
|
||||
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
|
||||
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool
|
||||
).start();
|
||||
final TransportService transportServiceClient = new TransportService(
|
||||
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool
|
||||
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool
|
||||
).start();
|
||||
|
||||
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.benchmark.transport;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
@ -44,13 +45,13 @@ public class TransportBenchmark {
|
||||
LOCAL {
|
||||
@Override
|
||||
public Transport newTransport(Settings settings, ThreadPool threadPool) {
|
||||
return new LocalTransport(settings, threadPool, Version.CURRENT);
|
||||
return new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry());
|
||||
}
|
||||
},
|
||||
NETTY {
|
||||
@Override
|
||||
public Transport newTransport(Settings settings, ThreadPool threadPool) {
|
||||
return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
return new NettyTransport(settings, threadPool, new NetworkService(Settings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
@ -168,7 +169,7 @@ public class ClusterStateDiffPublishingTests extends ElasticsearchTestCase {
|
||||
}
|
||||
|
||||
protected MockTransportService buildTransportService(Settings settings, Version version) {
|
||||
MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version), threadPool);
|
||||
MockTransportService transportService = new MockTransportService(settings, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
@ -21,13 +21,20 @@ package org.elasticsearch.common.io.streams;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.FilterStreamInput;
|
||||
import org.elasticsearch.common.lucene.BytesRefs;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.closeTo;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ -304,6 +311,50 @@ public class BytesStreamsTests extends ElasticsearchTestCase {
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamedWriteable() throws IOException {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
namedWriteableRegistry.registerPrototype(new TermQueryBuilder(null, null));
|
||||
TermQueryBuilder termQueryBuilder = new TermQueryBuilder(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
|
||||
out.writeNamedWriteable(termQueryBuilder);
|
||||
StreamInput in = new FilterStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry);
|
||||
QueryBuilder queryBuilder = in.readNamedWriteable();
|
||||
assertThat(queryBuilder, equalTo((QueryBuilder)termQueryBuilder));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamedWriteableDuplicates() throws IOException {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
namedWriteableRegistry.registerPrototype(new TermQueryBuilder(null, null));
|
||||
try {
|
||||
//wrong class, no registry available
|
||||
namedWriteableRegistry.registerPrototype(new TermQueryBuilder(null, null));
|
||||
fail("registerPrototype should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("named writeable of type [" + TermQueryBuilder.class.getName() + "] with name [" + TermQueryBuilder.NAME + "] is already registered by type [" + TermQueryBuilder.class.getName() + "]"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamedWriteableUnknownNamedWriteable() throws IOException {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
out.writeNamedWriteable(new MatchAllQueryBuilder());
|
||||
StreamInput in = StreamInput.wrap(out.bytes().toBytes());
|
||||
if (randomBoolean()) {
|
||||
in = new FilterStreamInput(in, namedWriteableRegistry);
|
||||
}
|
||||
try {
|
||||
//no match_all named writeable registered, can write but cannot read it back
|
||||
in.readNamedWriteable();
|
||||
fail("read should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("unknown named writeable with name [" + MatchAllQueryBuilder.NAME + "]"));
|
||||
}
|
||||
}
|
||||
|
||||
// we ignore this test for now since all existing callers of BytesStreamOutput happily
|
||||
// call bytes() after close().
|
||||
@Ignore
|
||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
|
||||
@ -105,7 +106,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
|
||||
}
|
||||
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, new NamedWriteableRegistry()), threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -65,10 +66,10 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("testSimplePings");
|
||||
final ClusterName clusterName = new ClusterName("test");
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
|
||||
@ -138,7 +139,7 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
|
||||
|
||||
final ThreadPool threadPool = new ThreadPool("testExternalPing");
|
||||
final ClusterName clusterName = new ClusterName("test");
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
|
||||
|
@ -23,6 +23,7 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -59,13 +60,13 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
ElectMasterService electMasterService = new ElectMasterService(settings);
|
||||
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
|
||||
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
|
||||
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
|
@ -33,6 +33,8 @@ import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.inject.ModulesBuilder;
|
||||
import org.elasticsearch.common.inject.util.Providers;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.FilterStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
@ -89,6 +91,8 @@ public abstract class BaseQueryTestCase<QB extends QueryBuilder<QB>> extends Ela
|
||||
return currentTypes;
|
||||
}
|
||||
|
||||
private static NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
/**
|
||||
* Setup for the whole base test class.
|
||||
* @throws IOException
|
||||
@ -119,6 +123,7 @@ public abstract class BaseQueryTestCase<QB extends QueryBuilder<QB>> extends Ela
|
||||
protected void configure() {
|
||||
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
|
||||
bind(CircuitBreakerService.class).to(NoneCircuitBreakerService.class);
|
||||
bind(NamedWriteableRegistry.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
).createInjector();
|
||||
@ -136,6 +141,7 @@ public abstract class BaseQueryTestCase<QB extends QueryBuilder<QB>> extends Ela
|
||||
STRING_FIELD_NAME, "type=string").string()), false);
|
||||
currentTypes[i] = type;
|
||||
}
|
||||
namedWriteableRegistry = injector.getInstance(NamedWriteableRegistry.class);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@ -252,7 +258,7 @@ public abstract class BaseQueryTestCase<QB extends QueryBuilder<QB>> extends Ela
|
||||
QB testQuery = createTestQueryBuilder();
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
testQuery.writeTo(output);
|
||||
try (StreamInput in = StreamInput.wrap(output.bytes())) {
|
||||
try (StreamInput in = new FilterStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
QB deserializedQuery = createEmptyQueryBuilder().readFrom(in);
|
||||
assertEquals(deserializedQuery, testQuery);
|
||||
assertEquals(deserializedQuery.hashCode(), testQuery.hashCode());
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.plugins;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
@ -91,8 +92,8 @@ public class PluggableTransportModuleTests extends ElasticsearchIntegrationTest
|
||||
public static final class CountingAssertingLocalTransport extends AssertingLocalTransport {
|
||||
|
||||
@Inject
|
||||
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
|
||||
super(settings, threadPool, version);
|
||||
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.test.transport;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
@ -45,8 +46,8 @@ public class AssertingLocalTransport extends LocalTransport {
|
||||
private final Version maxVersion;
|
||||
|
||||
@Inject
|
||||
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
|
||||
super(settings, threadPool, version);
|
||||
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry);
|
||||
final long seed = settings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||
random = new Random(seed);
|
||||
minVersion = settings.getAsVersion(ASSERTING_TRANSPORT_MIN_VERSION_KEY, Version.V_0_18_0);
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.transport;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -51,6 +52,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
|
||||
|
||||
protected ThreadPool threadPool;
|
||||
|
||||
protected static final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
protected static final Version version0 = Version.fromId(/*0*/99);
|
||||
protected DiscoveryNode nodeA;
|
||||
protected MockTransportService serviceA;
|
||||
@ -59,7 +61,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
|
||||
protected DiscoveryNode nodeB;
|
||||
protected MockTransportService serviceB;
|
||||
|
||||
protected abstract MockTransportService build(Settings settings, Version version);
|
||||
protected abstract MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry);
|
||||
|
||||
@Override
|
||||
@Before
|
||||
@ -68,12 +70,14 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
serviceA = build(
|
||||
Settings.builder().put("name", "TS_A", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
|
||||
version0
|
||||
version0,
|
||||
namedWriteableRegistry
|
||||
);
|
||||
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version0);
|
||||
serviceB = build(
|
||||
Settings.builder().put("name", "TS_B", TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
|
||||
version1
|
||||
version1,
|
||||
namedWriteableRegistry
|
||||
);
|
||||
nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version1);
|
||||
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.transport;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -62,7 +63,7 @@ public class NettySizeHeaderFrameDecoderTests extends ElasticsearchTestCase {
|
||||
threadPool.setNodeSettingsService(new NodeSettingsService(settings));
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT);
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry());
|
||||
nettyTransport.start();
|
||||
TransportService transportService = new TransportService(nettyTransport, threadPool);
|
||||
nettyTransport.transportServiceAdapter(transportService.createAdapter());
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.transport.local;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTests;
|
||||
@ -27,8 +28,8 @@ import org.elasticsearch.transport.AbstractSimpleTransportTests;
|
||||
public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
|
||||
|
||||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
|
||||
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY, new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -48,11 +49,11 @@ public class NettyScheduledPingTests extends ElasticsearchTestCase {
|
||||
int endPort = startPort + 10;
|
||||
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build();
|
||||
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
serviceA.start();
|
||||
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
serviceB.start();
|
||||
|
||||
|
@ -23,6 +23,7 @@ import com.google.common.base.Charsets;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -195,7 +196,7 @@ public class NettyTransportMultiPortTests extends ElasticsearchTestCase {
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT);
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, new NamedWriteableRegistry());
|
||||
nettyTransport.start();
|
||||
|
||||
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
@ -83,21 +84,21 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
|
||||
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
|
||||
|
||||
@Inject
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
|
||||
super(settings, threadPool, networkService, bigArrays, version);
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
|
||||
return new ErrorPipelineFactory(this, name, groupSettings);
|
||||
return new ErrorPipelineFactory(this, name, groupSettings, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport, String name, Settings groupSettings) {
|
||||
super(exceptionThrowingNettyTransport, name, groupSettings);
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport, String name, Settings groupSettings, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(exceptionThrowingNettyTransport, name, groupSettings, namedWriteableRegistry);
|
||||
this.logger = exceptionThrowingNettyTransport.logger;
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
@ -35,11 +36,11 @@ import org.junit.Test;
|
||||
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
|
||||
|
||||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
int startPort = 11000 + randomIntBetween(0, 255);
|
||||
int endPort = startPort + 10;
|
||||
settings = Settings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build();
|
||||
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version), threadPool);
|
||||
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry), threadPool);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user