Merge branch 'master' into pr/19144-discovery-azure-classic
# Conflicts: # plugins/discovery-azure-classic/LICENSE.txt
This commit is contained in:
commit
8a2b27076e
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -496,7 +497,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
|
||||
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class,
|
||||
org.elasticsearch.search.SearchContextMissingException::new, 24),
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
org.elasticsearch.script.GeneralScriptException::new, 25),
|
||||
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
|
||||
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
|
||||
|
@ -676,8 +677,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.indices.IndexAlreadyExistsException::new, 123),
|
||||
SCRIPT_PARSE_EXCEPTION(org.elasticsearch.script.Script.ScriptParseException.class,
|
||||
org.elasticsearch.script.Script.ScriptParseException::new, 124),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class,
|
||||
org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException::new, 125),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpOnTransportException.class,
|
||||
TcpTransport.HttpOnTransportException::new, 125),
|
||||
MAPPER_PARSING_EXCEPTION(org.elasticsearch.index.mapper.MapperParsingException.class,
|
||||
org.elasticsearch.index.mapper.MapperParsingException::new, 126),
|
||||
SEARCH_CONTEXT_EXCEPTION(org.elasticsearch.search.SearchContextException.class,
|
||||
|
|
|
@ -50,8 +50,8 @@ import org.elasticsearch.plugins.PluginsService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
|
@ -107,7 +107,7 @@ public class TransportClient extends AbstractClient {
|
|||
|
||||
private PluginsService newPluginService(final Settings settings) {
|
||||
final Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(InternalSettingsPreparer.prepareSettings(settings))
|
||||
.put(NetworkService.NETWORK_SERVER.getKey(), false)
|
||||
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -159,25 +158,6 @@ public final class Channels {
|
|||
return bytesRead;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel}
|
||||
*
|
||||
* @param source ChannelBuffer to copy from
|
||||
* @param sourceIndex index in <i>source</i> to start copying from
|
||||
* @param length how many bytes to copy
|
||||
* @param channel target GatheringByteChannel
|
||||
*/
|
||||
public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
|
||||
while (length > 0) {
|
||||
int written = source.getBytes(sourceIndex, channel, length);
|
||||
sourceIndex += written;
|
||||
length -= written;
|
||||
}
|
||||
assert length == 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
|
||||
*
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
|
||||
/**
|
||||
* A marker to not remove frame decoder from the resulting jar so plugins can use it.
|
||||
*/
|
||||
public class KeepFrameDecoder extends FrameDecoder {
|
||||
|
||||
public static final KeepFrameDecoder decoder = new KeepFrameDecoder();
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
/**
|
||||
* A channel listener that releases a {@link org.elasticsearch.common.lease.Releasable} when
|
||||
* the operation is complete.
|
||||
*/
|
||||
public class ReleaseChannelFutureListener implements ChannelFutureListener {
|
||||
|
||||
private final Releasable releasable;
|
||||
|
||||
public ReleaseChannelFutureListener(Releasable releasable) {
|
||||
this.releasable = releasable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
releasable.close();
|
||||
}
|
||||
}
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
@ -279,14 +280,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
TransportSettings.PUBLISH_PORT,
|
||||
TransportSettings.PORT,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_REG,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_PING,
|
||||
NettyTransport.PING_SCHEDULE,
|
||||
NettyTransport.TCP_BLOCKING_CLIENT,
|
||||
NettyTransport.TCP_CONNECT_TIMEOUT,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_PING,
|
||||
TcpTransport.PING_SCHEDULE,
|
||||
TcpTransport.TCP_BLOCKING_CLIENT,
|
||||
TcpTransport.TCP_CONNECT_TIMEOUT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
|
@ -294,12 +295,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NetworkService.NETWORK_SERVER,
|
||||
NettyTransport.NETTY_BOSS_COUNT,
|
||||
NettyTransport.TCP_NO_DELAY,
|
||||
NettyTransport.TCP_KEEP_ALIVE,
|
||||
NettyTransport.TCP_REUSE_ADDRESS,
|
||||
NettyTransport.TCP_SEND_BUFFER_SIZE,
|
||||
NettyTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
NettyTransport.TCP_BLOCKING_SERVER,
|
||||
TcpTransport.TCP_NO_DELAY,
|
||||
TcpTransport.TCP_KEEP_ALIVE,
|
||||
TcpTransport.TCP_REUSE_ADDRESS,
|
||||
TcpTransport.TCP_SEND_BUFFER_SIZE,
|
||||
TcpTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
TcpTransport.TCP_BLOCKING_SERVER,
|
||||
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
|
|
|
@ -24,8 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
|
@ -128,7 +127,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
}
|
||||
|
||||
if (content instanceof Releasable) {
|
||||
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
|
||||
future.addListener((x) -> ((Releasable)content).close());
|
||||
addedReleaseListener = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.http.netty;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
|
|
@ -24,8 +24,8 @@ import com.carrotsearch.hppc.IntSet;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TcpHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
|
||||
output.writeByte((byte)'E');
|
||||
output.writeByte((byte)'S');
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
output.writeInt(messageSize - TcpHeader.MARKER_BYTES_SIZE - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
output.writeLong(requestId);
|
||||
output.writeByte(status);
|
||||
output.writeInt(version.id);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public final class TcpTransportChannel<Channel> implements TransportChannel {
|
||||
private final TcpTransport<Channel> transport;
|
||||
protected final Version version;
|
||||
protected final String action;
|
||||
protected final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
private final String channelType;
|
||||
private final Channel channel;
|
||||
|
||||
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.version = version;
|
||||
this.channel = channel;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
this.channelType = channelType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
transport.sendResponse(version, channel, response, requestId, action, options);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
transport.sendErrorResponse(version, channel, error, requestId, action);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.getInFlightRequestBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getChannelType() {
|
||||
return channelType;
|
||||
}
|
||||
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -94,4 +96,9 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
long serverOpen();
|
||||
|
||||
List<String> getLocalAddresses();
|
||||
|
||||
default CircuitBreaker getInFlightRequestBreaker() {
|
||||
return new NoopCircuitBreaker("in-flight-noop");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -39,10 +38,10 @@ public enum Transports {
|
|||
final String threadName = t.getName();
|
||||
for (String s : Arrays.asList(
|
||||
LocalTransport.LOCAL_TRANSPORT_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
|
||||
if (threadName.contains(s)) {
|
||||
return true;
|
||||
|
|
|
@ -16,13 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,9 +33,12 @@ import java.nio.charset.StandardCharsets;
|
|||
final class ChannelBufferBytesReference implements BytesReference {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int size;
|
||||
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer) {
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer, int size) {
|
||||
this.buffer = buffer;
|
||||
this.size = size;
|
||||
assert size <= buffer.readableBytes() : "size[" + size +"] > " + buffer.readableBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -46,25 +48,24 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public int length() {
|
||||
return buffer.readableBytes();
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference slice(int from, int length) {
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length));
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamInput streamInput() {
|
||||
return ChannelBufferStreamInputFactory.create(buffer.duplicate());
|
||||
return new ChannelBufferStreamInput(buffer.duplicate(), size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
buffer.getBytes(buffer.readerIndex(), os, length());
|
||||
buffer.getBytes(buffer.readerIndex(), os, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
return copyBytesArray().toBytes();
|
||||
}
|
||||
|
@ -72,7 +73,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesArray toBytesArray() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
return copyBytesArray();
|
||||
}
|
||||
|
@ -111,7 +112,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesRef toBytesRef() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
|
@ -120,7 +121,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public BytesRef copyBytesRef() {
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
byte[] copy = new byte[size];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
return new BytesRef(copy);
|
||||
}
|
|
@ -22,16 +22,14 @@ package org.elasticsearch.transport.netty;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
|
||||
*/
|
||||
public class ChannelBufferStreamInput extends StreamInput {
|
||||
class ChannelBufferStreamInput extends StreamInput {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int startIndex;
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ChannelBufferStreamInputFactory {
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer) {
|
||||
return new ChannelBufferStreamInput(buffer, buffer.readableBytes());
|
||||
}
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer, int size) {
|
||||
return new ChannelBufferStreamInput(buffer, size);
|
||||
}
|
||||
}
|
|
@ -1,437 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.compress.NotCompressedException;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportSerializationException;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final ESLogger logger;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
|
||||
this.threadPool = transport.threadPool();
|
||||
this.threadContext = threadPool.getThreadContext();
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.logger = logger;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
Marker marker = new Marker(buffer);
|
||||
int size = marker.messageSizeWithRemainingHeaders();
|
||||
transportServiceAdapter.received(marker.messageSizeWithAllHeaders());
|
||||
|
||||
// we have additional bytes to read, outside of the header
|
||||
boolean hasMessageBytesToRead = marker.messageSize() != 0;
|
||||
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time
|
||||
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
|
||||
boolean success = false;
|
||||
try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
|
||||
long requestId = streamIn.readLong();
|
||||
byte status = streamIn.readByte();
|
||||
Version version = Version.fromId(streamIn.readInt());
|
||||
|
||||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
|
||||
Compressor compressor;
|
||||
try {
|
||||
compressor = CompressorFactory.compressor(NettyUtils.toBytesReference(buffer));
|
||||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
|
||||
.append("] content bytes out of [").append(buffer.readableBytes())
|
||||
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
|
||||
for (int i = 0; i < maxToRead; i++) {
|
||||
sb.append(buffer.getByte(offset + i)).append(",");
|
||||
}
|
||||
sb.append("]");
|
||||
throw new IllegalStateException(sb.toString());
|
||||
}
|
||||
streamIn = compressor.streamInput(streamIn);
|
||||
}
|
||||
if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) {
|
||||
throw new IllegalStateException("Received message from unsupported version: [" + version
|
||||
+ "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]");
|
||||
}
|
||||
streamIn.setVersion(version);
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
threadContext.readHeaders(streamIn);
|
||||
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
|
||||
} else {
|
||||
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
if (handler != null) {
|
||||
if (TransportStatus.isError(status)) {
|
||||
handlerResponseError(streamIn, handler);
|
||||
} else {
|
||||
handleResponse(ctx.getChannel(), streamIn, handler);
|
||||
}
|
||||
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
try {
|
||||
if (success) {
|
||||
IOUtils.close(streamIn);
|
||||
} else {
|
||||
IOUtils.closeWhileHandlingException(streamIn);
|
||||
}
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(marker.expectedReaderIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final TransportResponse response = handler.newInstance();
|
||||
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
response.remoteAddress();
|
||||
try {
|
||||
response.readFrom(buffer);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
//noinspection unchecked
|
||||
handler.handleResponse(response);
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
|
||||
Throwable error;
|
||||
try {
|
||||
error = buffer.readThrowable();
|
||||
} catch (Throwable e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
}
|
||||
|
||||
private void handleException(final TransportResponseHandler handler, Throwable error) {
|
||||
if (!(error instanceof RemoteTransportException)) {
|
||||
error = new RemoteTransportException(error.getMessage(), error);
|
||||
}
|
||||
final RemoteTransportException rtx = (RemoteTransportException) error;
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
|
||||
Version version) throws IOException {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final String action = buffer.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
NettyTransportChannel transportChannel = null;
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
if (reg.canTripCircuitBreaker()) {
|
||||
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
||||
} else {
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
|
||||
}
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, messageLengthBytes);
|
||||
final TransportRequest request = reg.newRequest();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
|
||||
validateRequest(marker, buffer, requestId, action);
|
||||
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
|
||||
//noinspection unchecked
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
} else {
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// the circuit breaker tripped
|
||||
if (transportChannel == null) {
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, 0);
|
||||
}
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e, action);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
// This template method is needed to inject custom error checking logic in tests.
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
marker.validateRequest(buffer, requestId, action);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
|
||||
class ResponseHandler implements Runnable {
|
||||
|
||||
private final TransportResponseHandler handler;
|
||||
private final TransportResponse response;
|
||||
|
||||
public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
|
||||
this.handler = handler;
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleResponse(response);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RequestHandler extends AbstractRunnable {
|
||||
private final RequestHandlerRegistry reg;
|
||||
private final TransportRequest request;
|
||||
private final NettyTransportChannel transportChannel;
|
||||
|
||||
public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
|
||||
this.reg = reg;
|
||||
this.request = request;
|
||||
this.transportChannel = transportChannel;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return reg.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction());
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper class to store characteristic offsets of a buffer during processing
|
||||
*/
|
||||
protected static final class Marker {
|
||||
private final ChannelBuffer buffer;
|
||||
private final int remainingMessageSize;
|
||||
private final int expectedReaderIndex;
|
||||
|
||||
public Marker(ChannelBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
// when this constructor is called, we have read already two parts of the message header: the marker bytes and the message
|
||||
// message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the
|
||||
// remaining message length again.
|
||||
this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE);
|
||||
this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that have yet to be read from the buffer
|
||||
*/
|
||||
public int messageSizeWithRemainingHeaders() {
|
||||
return remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already)
|
||||
*/
|
||||
public int messageSizeWithAllHeaders() {
|
||||
return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes for the message itself (excluding all headers).
|
||||
*/
|
||||
public int messageSize() {
|
||||
return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expected index of the buffer's reader after the message has been consumed entirely.
|
||||
*/
|
||||
public int expectedReaderIndex() {
|
||||
return expectedReaderIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a request has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The current request id.
|
||||
* @param action The currently executed action.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a response has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The corresponding request id for this response.
|
||||
* @param handler The current response handler.
|
||||
* @param error Whether validate an error response.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateResponse(StreamInput stream, long requestId,
|
||||
TransportResponseHandler<?> handler, boolean error) throws IOException {
|
||||
// Check the entire message has been read
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NettyHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
/**
|
||||
* The magic number (must be lower than 0) for a ping message. This is handled
|
||||
* specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}.
|
||||
*/
|
||||
public static final int PING_DATA_SIZE = -1;
|
||||
private final static ChannelBuffer pingHeader;
|
||||
static {
|
||||
pingHeader = ChannelBuffers.buffer(6);
|
||||
pingHeader.writeByte('E');
|
||||
pingHeader.writeByte('S');
|
||||
pingHeader.writeInt(PING_DATA_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* A ping header is same as regular header, just with -1 for the size of the message.
|
||||
*/
|
||||
public static ChannelBuffer pingHeader() {
|
||||
return pingHeader.duplicate();
|
||||
}
|
||||
|
||||
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
|
||||
int index = buffer.readerIndex();
|
||||
buffer.setByte(index, 'E');
|
||||
index += 1;
|
||||
buffer.setByte(index, 'S');
|
||||
index += 1;
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE);
|
||||
index += MESSAGE_LENGTH_SIZE;
|
||||
buffer.setLong(index, requestId);
|
||||
index += REQUEST_ID_SIZE;
|
||||
buffer.setByte(index, status);
|
||||
index += STATUS_SIZE;
|
||||
buffer.setInt(index, version.id);
|
||||
}
|
||||
}
|
|
@ -27,11 +27,11 @@ import org.jboss.netty.logging.AbstractInternalLogger;
|
|||
*
|
||||
*/
|
||||
@SuppressLoggerChecks(reason = "safely delegates to logger")
|
||||
public class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
final class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public NettyInternalESLogger(ESLogger logger) {
|
||||
NettyInternalESLogger(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class NettyInternalESLoggerFactory extends InternalLoggerFactory {
|
||||
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TcpTransportChannel;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
|
||||
NettyMessageChannelHandler(NettyTransport transport, String profileName) {
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
final ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.getChannel().getRemoteAddress();
|
||||
try {
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
|
||||
BytesReference reference = NettyUtils.toBytesReference(buffer, remainingMessageSize);
|
||||
transport.messageReceived(reference, ctx.getChannel(), profileName, remoteAddress, remainingMessageSize);
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(expectedReaderIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,176 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class NettyTransportChannel implements TransportChannel {
|
||||
|
||||
private final NettyTransport transport;
|
||||
private final TransportServiceAdapter transportServiceAdapter;
|
||||
private final Version version;
|
||||
private final String action;
|
||||
private final Channel channel;
|
||||
private final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
this.version = version;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.channel = channel;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
if (transport.compress) {
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
||||
ReleasableBytesStreamOutput bStream = null;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
|
||||
}
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
stream.close();
|
||||
|
||||
ReleasablePagedBytesReference bytes = bStream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
final TransportResponseOptions finalOptions = options;
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
|
||||
future.addListener(onResponseSentListener);
|
||||
} finally {
|
||||
if (!addedReleaseListener && bStream != null) {
|
||||
Releasables.close(bStream.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(
|
||||
transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
stream.writeThrowable(tx);
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
status = TransportStatus.setError(status);
|
||||
|
||||
BytesReference bytes = stream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
future.addListener(onResponseSentListener);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return "netty";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the underlying netty channel. This method is intended be used for access to netty to get additional
|
||||
* details when processing the request and may be used by plugins. Responses should be sent using the methods
|
||||
* defined in this class and not directly on the channel.
|
||||
* @return underlying netty channel
|
||||
*/
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,12 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
|
@ -93,10 +93,11 @@ public class NettyUtils {
|
|||
}
|
||||
|
||||
static {
|
||||
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
|
||||
InternalLoggerFactory.setDefaultFactory(new InternalLoggerFactory() {
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
|
||||
name = name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty.");
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -136,6 +137,13 @@ public class NettyUtils {
|
|||
* Wraps the given ChannelBuffer with a BytesReference
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer) {
|
||||
return new ChannelBufferBytesReference(channelBuffer);
|
||||
return toBytesReference(channelBuffer, channelBuffer.readableBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the given ChannelBuffer with a BytesReference of a given size
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer, int size) {
|
||||
return new ChannelBufferBytesReference(channelBuffer, size);
|
||||
}
|
||||
}
|
|
@ -17,8 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
@ -32,13 +33,14 @@ import org.jboss.netty.channel.ChannelState;
|
|||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
||||
|
||||
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
|
||||
final CounterMetric openChannelsMetric = new CounterMetric();
|
||||
|
@ -91,6 +93,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
|||
return totalChannelsMetric.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (Channel channel : openChannels) {
|
||||
channel.close().awaitUninterruptibly();
|
|
@ -19,107 +19,29 @@
|
|||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().bytes() * 0.9);
|
||||
final class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
if (buffer.readableBytes() < sizeHeaderLength) {
|
||||
try {
|
||||
boolean continueProcessing = TcpTransport.validateMessageHeader(NettyUtils.toBytesReference(buffer));
|
||||
buffer.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
return continueProcessing ? buffer : null;
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new TooLongFrameException(ex.getMessage(), ex);
|
||||
} catch (IllegalStateException ex) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int readerIndex = buffer.readerIndex();
|
||||
if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') {
|
||||
// special handling for what is probably HTTP
|
||||
if (bufferStartsWith(buffer, readerIndex, "GET ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "POST ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PUT ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "HEAD ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "DELETE ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "OPTIONS ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PATCH ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "TRACE ")) {
|
||||
|
||||
throw new HttpOnTransportException("This is not a HTTP port");
|
||||
}
|
||||
|
||||
// we have 6 readable bytes, show 4 (should be enough)
|
||||
throw new StreamCorruptedException("invalid internal transport message format, got ("
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 1) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 2) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")");
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE);
|
||||
if (dataLen == NettyHeader.PING_DATA_SIZE) {
|
||||
// discard the messages we read and continue, this is achieved by skipping the bytes
|
||||
// and returning null
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return null;
|
||||
}
|
||||
if (dataLen <= 0) {
|
||||
throw new StreamCorruptedException("invalid data length: " + dataLen);
|
||||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + sizeHeaderLength) {
|
||||
return null;
|
||||
}
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private boolean bufferStartsWith(ChannelBuffer buffer, int readerIndex, String method) {
|
||||
char[] chars = method.toCharArray();
|
||||
for (int i = 0; i < chars.length; i++) {
|
||||
if (buffer.getByte(readerIndex + i) != chars[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper exception to mark an incoming connection as potentially being HTTP
|
||||
* so an appropriate error code can be returned
|
||||
*/
|
||||
public static class HttpOnTransportException extends ElasticsearchException {
|
||||
|
||||
public HttpOnTransportException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
|
||||
public HttpOnTransportException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
|||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ActionTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -763,7 +764,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(122, null);
|
||||
ids.put(123, org.elasticsearch.indices.IndexAlreadyExistsException.class);
|
||||
ids.put(124, org.elasticsearch.script.Script.ScriptParseException.class);
|
||||
ids.put(125, org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class);
|
||||
ids.put(125, TcpTransport.HttpOnTransportException.class);
|
||||
ids.put(126, org.elasticsearch.index.mapper.MapperParsingException.class);
|
||||
ids.put(127, org.elasticsearch.search.SearchContextException.class);
|
||||
ids.put(128, org.elasticsearch.search.builder.SearchSourceBuilderException.class);
|
||||
|
|
|
@ -162,20 +162,6 @@ public class ChannelsTests extends ESTestCase {
|
|||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
|
||||
public void testWriteFromChannel() throws IOException {
|
||||
int length = randomIntBetween(1, randomBytes.length / 2);
|
||||
int offset = randomIntBetween(0, randomBytes.length - length);
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(randomBytes);
|
||||
ChannelBuffer source = new ByteBufferBackedChannelBuffer(byteBuffer);
|
||||
Channels.writeToChannel(source, offset, length, fileChannel);
|
||||
|
||||
BytesReference copyRef = new BytesArray(Channels.readFromFileChannel(fileChannel, 0, length));
|
||||
BytesReference sourceRef = new BytesArray(randomBytes, offset, length);
|
||||
|
||||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
class MockFileChannel extends FileChannel {
|
||||
|
||||
FileChannel delegate;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
|
@ -324,16 +324,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testVoidMessageCompressed() {
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
(request, channel) -> {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -17,13 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -32,11 +30,11 @@ import java.util.List;
|
|||
|
||||
import static java.net.InetAddress.getByName;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.elasticsearch.transport.netty.NettyTransport.resolvePublishPort;
|
||||
import static org.elasticsearch.transport.TcpTransport.resolvePublishPort;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class NettyPublishPortTests extends ESTestCase {
|
||||
public class PublishPortTests extends ESTestCase {
|
||||
|
||||
public void testPublishPort() throws Exception {
|
||||
int boundPort = randomIntBetween(9000, 9100);
|
|
@ -17,17 +17,17 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
/** Unit tests for NettyTransport */
|
||||
public class NettyTransportTests extends ESTestCase {
|
||||
|
||||
/** Unit tests for TCPTransport */
|
||||
public class TCPTransportTests extends ESTestCase {
|
||||
|
||||
/** Test ipv4 host with a default port works */
|
||||
public void testParseV4DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -36,19 +36,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with a default port range works */
|
||||
public void testParseV4DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("127.0.0.1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with port works */
|
||||
public void testParseV4WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -57,7 +57,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with port range works */
|
||||
public void testParseV4WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -70,7 +70,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
|
||||
public void testParseV6UnBracketed() throws Exception {
|
||||
try {
|
||||
NettyTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
TcpTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
fail("should have gotten exception");
|
||||
} catch (IllegalArgumentException expected) {
|
||||
assertTrue(expected.getMessage().contains("must be bracketed"));
|
||||
|
@ -79,7 +79,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port works */
|
||||
public void testParseV6DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -88,19 +88,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port range works */
|
||||
public void testParseV6DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with port works */
|
||||
public void testParseV6WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -109,7 +109,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with port range works */
|
||||
public void testParseV6WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -118,10 +118,10 @@ public class NettyTransportTests extends ESTestCase {
|
|||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(2346, addresses[1].getPort());
|
||||
}
|
||||
|
||||
|
||||
/** Test per-address limit */
|
||||
public void testAddressLimit() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:100-200", "1000", 3);
|
||||
assertEquals(3, addresses.length);
|
||||
assertEquals(100, addresses[0].getPort());
|
||||
assertEquals(101, addresses[1].getPort());
|
|
@ -16,12 +16,13 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
|
@ -19,7 +19,6 @@
|
|||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
@ -33,6 +32,7 @@ import org.elasticsearch.test.transport.MockTransportService;
|
|||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -56,7 +56,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TransportSettings.PORT.getKey(), 0)
|
||||
.put("cluster.name", "test")
|
||||
.build();
|
||||
|
@ -89,12 +89,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
}
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
|
@ -137,15 +137,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
}).txGet();
|
||||
}
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
Releasables.close(serviceA, serviceB);
|
||||
terminate(threadPool);
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
|||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -98,45 +99,24 @@ public class NettyTransportIT extends ESIntegTestCase {
|
|||
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, String profileName,
|
||||
StreamInput stream, long requestId, int messageLengthBytes, Version version,
|
||||
InetSocketAddress remoteAddress) throws IOException {
|
||||
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
|
||||
remoteAddress);
|
||||
channelProfileName = TransportSettings.DEFAULT_PROFILE;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
|
||||
return new ErrorPipelineFactory(this, name, groupSettings);
|
||||
}
|
||||
|
||||
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) {
|
||||
super(nettyTransport, name, groupSettings);
|
||||
this.logger = nettyTransport.logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = super.getPipeline();
|
||||
pipeline.replace("dispatcher", "dispatcher",
|
||||
new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
|
||||
int messageLengthBytes, Version version) throws IOException {
|
||||
String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version);
|
||||
channelProfileName = this.profileName;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
super.validateRequest(marker, buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
return pipeline;
|
||||
protected void validateRequest(StreamInput buffer, long requestId, String action)
|
||||
throws IOException {
|
||||
super.validateRequest(buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.junit.Before;
|
||||
|
@ -58,7 +58,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -74,7 +74,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -91,7 +91,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -107,7 +107,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -125,7 +125,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -133,14 +133,13 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
TcpTransport<?> transport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
transport.start();
|
||||
|
||||
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return nettyTransport;
|
||||
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return transport;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -6,14 +6,9 @@ The `plugins` command provides a view per node of running plugins. This informat
|
|||
[source,sh]
|
||||
------------------------------------------------------------------------------
|
||||
% curl 'localhost:9200/_cat/plugins?v'
|
||||
name component version type isolation url
|
||||
Abraxas discovery-azure 2.1.0-SNAPSHOT j x
|
||||
Abraxas lang-javascript 2.0.0-SNAPSHOT j x
|
||||
Abraxas marvel NA j/s x /_plugin/marvel/
|
||||
Abraxas lang-python 2.0.0-SNAPSHOT j x
|
||||
Abraxas inquisitor NA s /_plugin/inquisitor/
|
||||
Abraxas kopf 0.5.2 s /_plugin/kopf/
|
||||
Abraxas segmentspy NA s /_plugin/segmentspy/
|
||||
name component version description
|
||||
Abraxas discovery-gce 5.0.0 The Google Compute Engine (GCE) Discovery plugin allows to use GCE API for the unicast discovery mechanism.
|
||||
Abraxas lang-javascript 5.0.0 The JavaScript language plugin allows to have javascript as the language of scripts to execute.
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
We can tell quickly how many plugins per node we have and which versions.
|
||||
|
|
|
@ -21,8 +21,6 @@ package org.elasticsearch.percolator;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.mapper.Mapper;
|
||||
|
@ -39,8 +37,6 @@ import java.util.Map;
|
|||
|
||||
public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlugin {
|
||||
|
||||
public static final String NAME = "percolator";
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
public PercolatorPlugin(Settings settings) {
|
||||
|
@ -65,7 +61,7 @@ public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlug
|
|||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
|
||||
return Collections.singletonList(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,7 +69,4 @@ public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlug
|
|||
return Collections.singletonMap(PercolatorFieldMapper.CONTENT_TYPE, new PercolatorFieldMapper.TypeParser());
|
||||
}
|
||||
|
||||
static boolean transportClientMode(Settings settings) {
|
||||
return TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,202 +0,0 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -1,202 +0,0 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -1,202 +0,0 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.search.SearchService;
|
|||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
@ -421,9 +422,9 @@ public final class InternalTestCluster extends TestCluster {
|
|||
// randomize netty settings
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.WORKER_COUNT.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(NettyTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
|
||||
builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
|
@ -455,7 +456,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(NettyTransport.PING_SCHEDULE.getKey(), RandomInts.randomIntBetween(random, 100, 2000) + "ms");
|
||||
builder.put(TcpTransport.PING_SCHEDULE.getKey(), RandomInts.randomIntBetween(random, 100, 2000) + "ms");
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
|
|
Loading…
Reference in New Issue