Merge branch 'master' into pr/15724-gce-network-host-master
This commit is contained in:
commit
0c3ce1fac2
|
@ -1168,7 +1168,6 @@
|
|||
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportMultiPercolateAction.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportPercolateAction.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]TransportShardMultiPercolateAction.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]RestPercolateAction.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]MultiPercolatorIT.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]PercolatorIT.java" checks="LineLength" />
|
||||
<suppress files="modules[/\\]percolator[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]percolator[/\\]MultiPercolatorRequestTests.java" checks="LineLength" />
|
||||
|
@ -1180,10 +1179,10 @@
|
|||
<suppress files="plugins[/\\]analysis-kuromoji[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]JapaneseStopTokenFilterFactory.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]analysis-kuromoji[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]KuromojiAnalysisTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]analysis-phonetic[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]PhoneticTokenFilterFactory.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]azure[/\\]AbstractAzureTestCase.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureMinimumMasterNodesTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureSimpleTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureTwoStartedNodesTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]azure[/\\]AbstractAzureTestCase.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureMinimumMasterNodesTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureSimpleTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure-classic[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureTwoStartedNodesTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]aws[/\\]AbstractAwsTestCase.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ec2[/\\]AmazonEC2Mock.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-gce[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]gce[/\\]GceNetworkTests.java" checks="LineLength" />
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -496,7 +497,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
|
||||
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class,
|
||||
org.elasticsearch.search.SearchContextMissingException::new, 24),
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
|
||||
org.elasticsearch.script.GeneralScriptException::new, 25),
|
||||
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
|
||||
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
|
||||
|
@ -676,8 +677,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.indices.IndexAlreadyExistsException::new, 123),
|
||||
SCRIPT_PARSE_EXCEPTION(org.elasticsearch.script.Script.ScriptParseException.class,
|
||||
org.elasticsearch.script.Script.ScriptParseException::new, 124),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class,
|
||||
org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException::new, 125),
|
||||
HTTP_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpOnTransportException.class,
|
||||
TcpTransport.HttpOnTransportException::new, 125),
|
||||
MAPPER_PARSING_EXCEPTION(org.elasticsearch.index.mapper.MapperParsingException.class,
|
||||
org.elasticsearch.index.mapper.MapperParsingException::new, 126),
|
||||
SEARCH_CONTEXT_EXCEPTION(org.elasticsearch.search.SearchContextException.class,
|
||||
|
|
|
@ -50,8 +50,8 @@ import org.elasticsearch.plugins.PluginsService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
|
@ -107,7 +107,7 @@ public class TransportClient extends AbstractClient {
|
|||
|
||||
private PluginsService newPluginService(final Settings settings) {
|
||||
final Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
|
||||
.put(InternalSettingsPreparer.prepareSettings(settings))
|
||||
.put(NetworkService.NETWORK_SERVER.getKey(), false)
|
||||
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -159,25 +158,6 @@ public final class Channels {
|
|||
return bytesRead;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel}
|
||||
*
|
||||
* @param source ChannelBuffer to copy from
|
||||
* @param sourceIndex index in <i>source</i> to start copying from
|
||||
* @param length how many bytes to copy
|
||||
* @param channel target GatheringByteChannel
|
||||
*/
|
||||
public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
|
||||
while (length > 0) {
|
||||
int written = source.getBytes(sourceIndex, channel, length);
|
||||
sourceIndex += written;
|
||||
length -= written;
|
||||
}
|
||||
assert length == 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
|
||||
*
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
|
||||
/**
|
||||
* A marker to not remove frame decoder from the resulting jar so plugins can use it.
|
||||
*/
|
||||
public class KeepFrameDecoder extends FrameDecoder {
|
||||
|
||||
public static final KeepFrameDecoder decoder = new KeepFrameDecoder();
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
/**
|
||||
* A channel listener that releases a {@link org.elasticsearch.common.lease.Releasable} when
|
||||
* the operation is complete.
|
||||
*/
|
||||
public class ReleaseChannelFutureListener implements ChannelFutureListener {
|
||||
|
||||
private final Releasable releasable;
|
||||
|
||||
public ReleaseChannelFutureListener(Releasable releasable) {
|
||||
this.releasable = releasable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
releasable.close();
|
||||
}
|
||||
}
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
@ -279,14 +280,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
TransportSettings.PUBLISH_PORT,
|
||||
TransportSettings.PORT,
|
||||
NettyTransport.WORKER_COUNT,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_REG,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
NettyTransport.CONNECTIONS_PER_NODE_PING,
|
||||
NettyTransport.PING_SCHEDULE,
|
||||
NettyTransport.TCP_BLOCKING_CLIENT,
|
||||
NettyTransport.TCP_CONNECT_TIMEOUT,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_STATE,
|
||||
TcpTransport.CONNECTIONS_PER_NODE_PING,
|
||||
TcpTransport.PING_SCHEDULE,
|
||||
TcpTransport.TCP_BLOCKING_CLIENT,
|
||||
TcpTransport.TCP_CONNECT_TIMEOUT,
|
||||
NettyTransport.NETTY_MAX_CUMULATION_BUFFER_CAPACITY,
|
||||
NettyTransport.NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,
|
||||
NettyTransport.NETTY_RECEIVE_PREDICTOR_SIZE,
|
||||
|
@ -294,12 +295,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
NettyTransport.NETTY_RECEIVE_PREDICTOR_MAX,
|
||||
NetworkService.NETWORK_SERVER,
|
||||
NettyTransport.NETTY_BOSS_COUNT,
|
||||
NettyTransport.TCP_NO_DELAY,
|
||||
NettyTransport.TCP_KEEP_ALIVE,
|
||||
NettyTransport.TCP_REUSE_ADDRESS,
|
||||
NettyTransport.TCP_SEND_BUFFER_SIZE,
|
||||
NettyTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
NettyTransport.TCP_BLOCKING_SERVER,
|
||||
TcpTransport.TCP_NO_DELAY,
|
||||
TcpTransport.TCP_KEEP_ALIVE,
|
||||
TcpTransport.TCP_REUSE_ADDRESS,
|
||||
TcpTransport.TCP_SEND_BUFFER_SIZE,
|
||||
TcpTransport.TCP_RECEIVE_BUFFER_SIZE,
|
||||
TcpTransport.TCP_BLOCKING_SERVER,
|
||||
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
||||
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
|
|
|
@ -24,8 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
|
@ -128,7 +127,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
}
|
||||
|
||||
if (content instanceof Releasable) {
|
||||
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
|
||||
future.addListener((x) -> ((Releasable)content).close());
|
||||
addedReleaseListener = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.http.netty;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
|
|
@ -24,8 +24,8 @@ import com.carrotsearch.hppc.IntSet;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.elasticsearch.transport.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TcpHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
|
||||
output.writeByte((byte)'E');
|
||||
output.writeByte((byte)'S');
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
output.writeInt(messageSize - TcpHeader.MARKER_BYTES_SIZE - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
output.writeLong(requestId);
|
||||
output.writeByte(status);
|
||||
output.writeInt(version.id);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public final class TcpTransportChannel<Channel> implements TransportChannel {
|
||||
private final TcpTransport<Channel> transport;
|
||||
protected final Version version;
|
||||
protected final String action;
|
||||
protected final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
private final String channelType;
|
||||
private final Channel channel;
|
||||
|
||||
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.version = version;
|
||||
this.channel = channel;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
this.channelType = channelType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
transport.sendResponse(version, channel, response, requestId, action, options);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
transport.sendErrorResponse(version, channel, error, requestId, action);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.getInFlightRequestBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getChannelType() {
|
||||
return channelType;
|
||||
}
|
||||
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -94,4 +96,9 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
long serverOpen();
|
||||
|
||||
List<String> getLocalAddresses();
|
||||
|
||||
default CircuitBreaker getInFlightRequestBreaker() {
|
||||
return new NoopCircuitBreaker("in-flight-noop");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -39,10 +38,10 @@ public enum Transports {
|
|||
final String threadName = t.getName();
|
||||
for (String s : Arrays.asList(
|
||||
LocalTransport.LOCAL_TRANSPORT_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
NettyTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
NettyTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
|
||||
TcpTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
|
||||
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
|
||||
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
|
||||
if (threadName.contains(s)) {
|
||||
return true;
|
||||
|
|
|
@ -16,13 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,9 +33,12 @@ import java.nio.charset.StandardCharsets;
|
|||
final class ChannelBufferBytesReference implements BytesReference {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int size;
|
||||
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer) {
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer, int size) {
|
||||
this.buffer = buffer;
|
||||
this.size = size;
|
||||
assert size <= buffer.readableBytes() : "size[" + size +"] > " + buffer.readableBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -46,25 +48,24 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public int length() {
|
||||
return buffer.readableBytes();
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference slice(int from, int length) {
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length));
|
||||
return new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex() + from, length), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamInput streamInput() {
|
||||
return ChannelBufferStreamInputFactory.create(buffer.duplicate());
|
||||
return new ChannelBufferStreamInput(buffer.duplicate(), size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
buffer.getBytes(buffer.readerIndex(), os, length());
|
||||
buffer.getBytes(buffer.readerIndex(), os, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
return copyBytesArray().toBytes();
|
||||
}
|
||||
|
@ -72,7 +73,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesArray toBytesArray() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
return copyBytesArray();
|
||||
}
|
||||
|
@ -111,7 +112,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
@Override
|
||||
public BytesRef toBytesRef() {
|
||||
if (buffer.hasArray()) {
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
return new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
|
||||
}
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
|
@ -120,7 +121,7 @@ final class ChannelBufferBytesReference implements BytesReference {
|
|||
|
||||
@Override
|
||||
public BytesRef copyBytesRef() {
|
||||
byte[] copy = new byte[buffer.readableBytes()];
|
||||
byte[] copy = new byte[size];
|
||||
buffer.getBytes(buffer.readerIndex(), copy);
|
||||
return new BytesRef(copy);
|
||||
}
|
|
@ -22,16 +22,14 @@ package org.elasticsearch.transport.netty;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
|
||||
*/
|
||||
public class ChannelBufferStreamInput extends StreamInput {
|
||||
class ChannelBufferStreamInput extends StreamInput {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
private final int startIndex;
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ChannelBufferStreamInputFactory {
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer) {
|
||||
return new ChannelBufferStreamInput(buffer, buffer.readableBytes());
|
||||
}
|
||||
|
||||
public static StreamInput create(ChannelBuffer buffer, int size) {
|
||||
return new ChannelBufferStreamInput(buffer, size);
|
||||
}
|
||||
}
|
|
@ -1,437 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.compress.NotCompressedException;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportSerializationException;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final ESLogger logger;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
|
||||
this.threadPool = transport.threadPool();
|
||||
this.threadContext = threadPool.getThreadContext();
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.logger = logger;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
Marker marker = new Marker(buffer);
|
||||
int size = marker.messageSizeWithRemainingHeaders();
|
||||
transportServiceAdapter.received(marker.messageSizeWithAllHeaders());
|
||||
|
||||
// we have additional bytes to read, outside of the header
|
||||
boolean hasMessageBytesToRead = marker.messageSize() != 0;
|
||||
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time
|
||||
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
|
||||
boolean success = false;
|
||||
try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
|
||||
long requestId = streamIn.readLong();
|
||||
byte status = streamIn.readByte();
|
||||
Version version = Version.fromId(streamIn.readInt());
|
||||
|
||||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
|
||||
Compressor compressor;
|
||||
try {
|
||||
compressor = CompressorFactory.compressor(NettyUtils.toBytesReference(buffer));
|
||||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
|
||||
.append("] content bytes out of [").append(buffer.readableBytes())
|
||||
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
|
||||
for (int i = 0; i < maxToRead; i++) {
|
||||
sb.append(buffer.getByte(offset + i)).append(",");
|
||||
}
|
||||
sb.append("]");
|
||||
throw new IllegalStateException(sb.toString());
|
||||
}
|
||||
streamIn = compressor.streamInput(streamIn);
|
||||
}
|
||||
if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) {
|
||||
throw new IllegalStateException("Received message from unsupported version: [" + version
|
||||
+ "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]");
|
||||
}
|
||||
streamIn.setVersion(version);
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
threadContext.readHeaders(streamIn);
|
||||
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
|
||||
} else {
|
||||
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
if (handler != null) {
|
||||
if (TransportStatus.isError(status)) {
|
||||
handlerResponseError(streamIn, handler);
|
||||
} else {
|
||||
handleResponse(ctx.getChannel(), streamIn, handler);
|
||||
}
|
||||
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
try {
|
||||
if (success) {
|
||||
IOUtils.close(streamIn);
|
||||
} else {
|
||||
IOUtils.closeWhileHandlingException(streamIn);
|
||||
}
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(marker.expectedReaderIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final TransportResponse response = handler.newInstance();
|
||||
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
response.remoteAddress();
|
||||
try {
|
||||
response.readFrom(buffer);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
//noinspection unchecked
|
||||
handler.handleResponse(response);
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
|
||||
Throwable error;
|
||||
try {
|
||||
error = buffer.readThrowable();
|
||||
} catch (Throwable e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
}
|
||||
|
||||
private void handleException(final TransportResponseHandler handler, Throwable error) {
|
||||
if (!(error instanceof RemoteTransportException)) {
|
||||
error = new RemoteTransportException(error.getMessage(), error);
|
||||
}
|
||||
final RemoteTransportException rtx = (RemoteTransportException) error;
|
||||
if (ThreadPool.Names.SAME.equals(handler.executor())) {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Throwable e) {
|
||||
logger.error("failed to handle exception response [{}]", e, handler);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
|
||||
Version version) throws IOException {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final String action = buffer.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
NettyTransportChannel transportChannel = null;
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
if (reg.canTripCircuitBreaker()) {
|
||||
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
||||
} else {
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
|
||||
}
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, messageLengthBytes);
|
||||
final TransportRequest request = reg.newRequest();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
|
||||
validateRequest(marker, buffer, requestId, action);
|
||||
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
|
||||
//noinspection unchecked
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
} else {
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// the circuit breaker tripped
|
||||
if (transportChannel == null) {
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, 0);
|
||||
}
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e, action);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
// This template method is needed to inject custom error checking logic in tests.
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
marker.validateRequest(buffer, requestId, action);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
|
||||
class ResponseHandler implements Runnable {
|
||||
|
||||
private final TransportResponseHandler handler;
|
||||
private final TransportResponse response;
|
||||
|
||||
public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
|
||||
this.handler = handler;
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.handleResponse(response);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RequestHandler extends AbstractRunnable {
|
||||
private final RequestHandlerRegistry reg;
|
||||
private final TransportRequest request;
|
||||
private final NettyTransportChannel transportChannel;
|
||||
|
||||
public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
|
||||
this.reg = reg;
|
||||
this.request = request;
|
||||
this.transportChannel = transportChannel;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return reg.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction());
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper class to store characteristic offsets of a buffer during processing
|
||||
*/
|
||||
protected static final class Marker {
|
||||
private final ChannelBuffer buffer;
|
||||
private final int remainingMessageSize;
|
||||
private final int expectedReaderIndex;
|
||||
|
||||
public Marker(ChannelBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
// when this constructor is called, we have read already two parts of the message header: the marker bytes and the message
|
||||
// message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the
|
||||
// remaining message length again.
|
||||
this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE);
|
||||
this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that have yet to be read from the buffer
|
||||
*/
|
||||
public int messageSizeWithRemainingHeaders() {
|
||||
return remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already)
|
||||
*/
|
||||
public int messageSizeWithAllHeaders() {
|
||||
return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes for the message itself (excluding all headers).
|
||||
*/
|
||||
public int messageSize() {
|
||||
return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expected index of the buffer's reader after the message has been consumed entirely.
|
||||
*/
|
||||
public int expectedReaderIndex() {
|
||||
return expectedReaderIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a request has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The current request id.
|
||||
* @param action The currently executed action.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a response has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The corresponding request id for this response.
|
||||
* @param handler The current response handler.
|
||||
* @param error Whether validate an error response.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateResponse(StreamInput stream, long requestId,
|
||||
TransportResponseHandler<?> handler, boolean error) throws IOException {
|
||||
// Check the entire message has been read
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NettyHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
/**
|
||||
* The magic number (must be lower than 0) for a ping message. This is handled
|
||||
* specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}.
|
||||
*/
|
||||
public static final int PING_DATA_SIZE = -1;
|
||||
private final static ChannelBuffer pingHeader;
|
||||
static {
|
||||
pingHeader = ChannelBuffers.buffer(6);
|
||||
pingHeader.writeByte('E');
|
||||
pingHeader.writeByte('S');
|
||||
pingHeader.writeInt(PING_DATA_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* A ping header is same as regular header, just with -1 for the size of the message.
|
||||
*/
|
||||
public static ChannelBuffer pingHeader() {
|
||||
return pingHeader.duplicate();
|
||||
}
|
||||
|
||||
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
|
||||
int index = buffer.readerIndex();
|
||||
buffer.setByte(index, 'E');
|
||||
index += 1;
|
||||
buffer.setByte(index, 'S');
|
||||
index += 1;
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE);
|
||||
index += MESSAGE_LENGTH_SIZE;
|
||||
buffer.setLong(index, requestId);
|
||||
index += REQUEST_ID_SIZE;
|
||||
buffer.setByte(index, status);
|
||||
index += STATUS_SIZE;
|
||||
buffer.setInt(index, version.id);
|
||||
}
|
||||
}
|
|
@ -27,11 +27,11 @@ import org.jboss.netty.logging.AbstractInternalLogger;
|
|||
*
|
||||
*/
|
||||
@SuppressLoggerChecks(reason = "safely delegates to logger")
|
||||
public class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
final class NettyInternalESLogger extends AbstractInternalLogger {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public NettyInternalESLogger(ESLogger logger) {
|
||||
NettyInternalESLogger(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class NettyInternalESLoggerFactory extends InternalLoggerFactory {
|
||||
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TcpTransportChannel;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.WriteCompletionEvent;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
* to the relevant action.
|
||||
*/
|
||||
class NettyMessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
protected final TransportServiceAdapter transportServiceAdapter;
|
||||
protected final NettyTransport transport;
|
||||
protected final String profileName;
|
||||
|
||||
NettyMessageChannelHandler(NettyTransport transport, String profileName) {
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
this.transport = transport;
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
|
||||
transportServiceAdapter.sent(e.getWrittenAmount());
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Transports.assertTransportThread();
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
final ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.getChannel().getRemoteAddress();
|
||||
try {
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size
|
||||
BytesReference reference = NettyUtils.toBytesReference(buffer, remainingMessageSize);
|
||||
transport.messageReceived(reference, ctx.getChannel(), profileName, remoteAddress, remainingMessageSize);
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(expectedReaderIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,176 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class NettyTransportChannel implements TransportChannel {
|
||||
|
||||
private final NettyTransport transport;
|
||||
private final TransportServiceAdapter transportServiceAdapter;
|
||||
private final Version version;
|
||||
private final String action;
|
||||
private final Channel channel;
|
||||
private final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean released = new AtomicBoolean();
|
||||
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
this.version = version;
|
||||
this.transport = transport;
|
||||
this.action = action;
|
||||
this.channel = channel;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String action() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
sendResponse(response, TransportResponseOptions.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
release();
|
||||
if (transport.compress) {
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
||||
ReleasableBytesStreamOutput bStream = null;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
|
||||
}
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
stream.close();
|
||||
|
||||
ReleasablePagedBytesReference bytes = bStream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
future.addListener(listener);
|
||||
addedReleaseListener = true;
|
||||
final TransportResponseOptions finalOptions = options;
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
|
||||
future.addListener(onResponseSentListener);
|
||||
} finally {
|
||||
if (!addedReleaseListener && bStream != null) {
|
||||
Releasables.close(bStream.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
release();
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(
|
||||
transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
stream.writeThrowable(tx);
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
status = TransportStatus.setError(status);
|
||||
|
||||
BytesReference bytes = stream.bytes();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
f -> transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
future.addListener(onResponseSentListener);
|
||||
}
|
||||
|
||||
private void release() {
|
||||
// attempt to release once atomically
|
||||
if (released.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("reserved bytes are already released");
|
||||
}
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return "netty";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the underlying netty channel. This method is intended be used for access to netty to get additional
|
||||
* details when processing the request and may be used by plugins. Responses should be sent using the methods
|
||||
* defined in this class and not directly on the channel.
|
||||
* @return underlying netty channel
|
||||
*/
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,12 +16,12 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
|
@ -93,10 +93,11 @@ public class NettyUtils {
|
|||
}
|
||||
|
||||
static {
|
||||
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
|
||||
InternalLoggerFactory.setDefaultFactory(new InternalLoggerFactory() {
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
|
||||
name = name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty.");
|
||||
return new NettyInternalESLogger(Loggers.getLogger(name));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -136,6 +137,13 @@ public class NettyUtils {
|
|||
* Wraps the given ChannelBuffer with a BytesReference
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer) {
|
||||
return new ChannelBufferBytesReference(channelBuffer);
|
||||
return toBytesReference(channelBuffer, channelBuffer.readableBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the given ChannelBuffer with a BytesReference of a given size
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer, int size) {
|
||||
return new ChannelBufferBytesReference(channelBuffer, size);
|
||||
}
|
||||
}
|
|
@ -17,8 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
@ -32,13 +33,14 @@ import org.jboss.netty.channel.ChannelState;
|
|||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
||||
public class OpenChannelsHandler implements ChannelUpstreamHandler, Releasable {
|
||||
|
||||
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
|
||||
final CounterMetric openChannelsMetric = new CounterMetric();
|
||||
|
@ -91,6 +93,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
|||
return totalChannelsMetric.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (Channel channel : openChannels) {
|
||||
channel.close().awaitUninterruptibly();
|
|
@ -19,107 +19,29 @@
|
|||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpHeader;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().bytes() * 0.9);
|
||||
final class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
if (buffer.readableBytes() < sizeHeaderLength) {
|
||||
try {
|
||||
boolean continueProcessing = TcpTransport.validateMessageHeader(NettyUtils.toBytesReference(buffer));
|
||||
buffer.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||
return continueProcessing ? buffer : null;
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new TooLongFrameException(ex.getMessage(), ex);
|
||||
} catch (IllegalStateException ex) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int readerIndex = buffer.readerIndex();
|
||||
if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') {
|
||||
// special handling for what is probably HTTP
|
||||
if (bufferStartsWith(buffer, readerIndex, "GET ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "POST ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PUT ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "HEAD ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "DELETE ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "OPTIONS ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "PATCH ") ||
|
||||
bufferStartsWith(buffer, readerIndex, "TRACE ")) {
|
||||
|
||||
throw new HttpOnTransportException("This is not a HTTP port");
|
||||
}
|
||||
|
||||
// we have 6 readable bytes, show 4 (should be enough)
|
||||
throw new StreamCorruptedException("invalid internal transport message format, got ("
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 1) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 2) & 0xFF) + ","
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")");
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE);
|
||||
if (dataLen == NettyHeader.PING_DATA_SIZE) {
|
||||
// discard the messages we read and continue, this is achieved by skipping the bytes
|
||||
// and returning null
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return null;
|
||||
}
|
||||
if (dataLen <= 0) {
|
||||
throw new StreamCorruptedException("invalid data length: " + dataLen);
|
||||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + sizeHeaderLength) {
|
||||
return null;
|
||||
}
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private boolean bufferStartsWith(ChannelBuffer buffer, int readerIndex, String method) {
|
||||
char[] chars = method.toCharArray();
|
||||
for (int i = 0; i < chars.length; i++) {
|
||||
if (buffer.getByte(readerIndex + i) != chars[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper exception to mark an incoming connection as potentially being HTTP
|
||||
* so an appropriate error code can be returned
|
||||
*/
|
||||
public static class HttpOnTransportException extends ElasticsearchException {
|
||||
|
||||
public HttpOnTransportException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
|
||||
public HttpOnTransportException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
|||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ActionTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -763,7 +764,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(122, null);
|
||||
ids.put(123, org.elasticsearch.indices.IndexAlreadyExistsException.class);
|
||||
ids.put(124, org.elasticsearch.script.Script.ScriptParseException.class);
|
||||
ids.put(125, org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.HttpOnTransportException.class);
|
||||
ids.put(125, TcpTransport.HttpOnTransportException.class);
|
||||
ids.put(126, org.elasticsearch.index.mapper.MapperParsingException.class);
|
||||
ids.put(127, org.elasticsearch.search.SearchContextException.class);
|
||||
ids.put(128, org.elasticsearch.search.builder.SearchSourceBuilderException.class);
|
||||
|
|
|
@ -162,20 +162,6 @@ public class ChannelsTests extends ESTestCase {
|
|||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
|
||||
public void testWriteFromChannel() throws IOException {
|
||||
int length = randomIntBetween(1, randomBytes.length / 2);
|
||||
int offset = randomIntBetween(0, randomBytes.length - length);
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(randomBytes);
|
||||
ChannelBuffer source = new ByteBufferBackedChannelBuffer(byteBuffer);
|
||||
Channels.writeToChannel(source, offset, length, fileChannel);
|
||||
|
||||
BytesReference copyRef = new BytesArray(Channels.readFromFileChannel(fileChannel, 0, length));
|
||||
BytesReference sourceRef = new BytesArray(randomBytes, offset, length);
|
||||
|
||||
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
|
||||
}
|
||||
|
||||
class MockFileChannel extends FileChannel {
|
||||
|
||||
FileChannel delegate;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
|
@ -324,16 +324,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testVoidMessageCompressed() {
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
(request, channel) -> {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -17,13 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -32,11 +30,11 @@ import java.util.List;
|
|||
|
||||
import static java.net.InetAddress.getByName;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.elasticsearch.transport.netty.NettyTransport.resolvePublishPort;
|
||||
import static org.elasticsearch.transport.TcpTransport.resolvePublishPort;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class NettyPublishPortTests extends ESTestCase {
|
||||
public class PublishPortTests extends ESTestCase {
|
||||
|
||||
public void testPublishPort() throws Exception {
|
||||
int boundPort = randomIntBetween(9000, 9100);
|
|
@ -17,17 +17,17 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
/** Unit tests for NettyTransport */
|
||||
public class NettyTransportTests extends ESTestCase {
|
||||
|
||||
/** Unit tests for TCPTransport */
|
||||
public class TCPTransportTests extends ESTestCase {
|
||||
|
||||
/** Test ipv4 host with a default port works */
|
||||
public void testParseV4DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -36,19 +36,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with a default port range works */
|
||||
public void testParseV4DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("127.0.0.1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with port works */
|
||||
public void testParseV4WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -57,7 +57,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with port range works */
|
||||
public void testParseV4WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
|
@ -70,7 +70,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
|
||||
public void testParseV6UnBracketed() throws Exception {
|
||||
try {
|
||||
NettyTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
TcpTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
fail("should have gotten exception");
|
||||
} catch (IllegalArgumentException expected) {
|
||||
assertTrue(expected.getMessage().contains("must be bracketed"));
|
||||
|
@ -79,7 +79,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port works */
|
||||
public void testParseV6DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -88,19 +88,19 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port range works */
|
||||
public void testParseV6DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
|
||||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with port works */
|
||||
public void testParseV6WithPort() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -109,7 +109,7 @@ public class NettyTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with port range works */
|
||||
public void testParseV6WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
|
@ -118,10 +118,10 @@ public class NettyTransportTests extends ESTestCase {
|
|||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(2346, addresses[1].getPort());
|
||||
}
|
||||
|
||||
|
||||
/** Test per-address limit */
|
||||
public void testAddressLimit() throws Exception {
|
||||
TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:100-200", "1000", 3);
|
||||
assertEquals(3, addresses.length);
|
||||
assertEquals(100, addresses[0].getPort());
|
||||
assertEquals(101, addresses[1].getPort());
|
|
@ -16,12 +16,13 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.transport.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
|
@ -19,7 +19,6 @@
|
|||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
@ -33,6 +32,7 @@ import org.elasticsearch.test.transport.MockTransportService;
|
|||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -56,7 +56,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TransportSettings.PORT.getKey(), 0)
|
||||
.put("cluster.name", "test")
|
||||
.build();
|
||||
|
@ -89,12 +89,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100L));
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
|
||||
}
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
|
@ -137,15 +137,12 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
}).txGet();
|
||||
}
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200L));
|
||||
}
|
||||
assertBusy(() -> {
|
||||
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
|
||||
});
|
||||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
|
||||
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
|
||||
|
||||
Releasables.close(serviceA, serviceB);
|
||||
terminate(threadPool);
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
|||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -98,45 +99,24 @@ public class NettyTransportIT extends ESIntegTestCase {
|
|||
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, String profileName,
|
||||
StreamInput stream, long requestId, int messageLengthBytes, Version version,
|
||||
InetSocketAddress remoteAddress) throws IOException {
|
||||
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
|
||||
remoteAddress);
|
||||
channelProfileName = TransportSettings.DEFAULT_PROFILE;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
|
||||
return new ErrorPipelineFactory(this, name, groupSettings);
|
||||
}
|
||||
|
||||
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) {
|
||||
super(nettyTransport, name, groupSettings);
|
||||
this.logger = nettyTransport.logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = super.getPipeline();
|
||||
pipeline.replace("dispatcher", "dispatcher",
|
||||
new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
|
||||
int messageLengthBytes, Version version) throws IOException {
|
||||
String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version);
|
||||
channelProfileName = this.profileName;
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
super.validateRequest(marker, buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
return pipeline;
|
||||
protected void validateRequest(StreamInput buffer, long requestId, String action)
|
||||
throws IOException {
|
||||
super.validateRequest(buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.junit.Before;
|
||||
|
@ -58,7 +58,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -74,7 +74,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(1, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -91,7 +91,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -107,7 +107,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -125,7 +125,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool("tst");
|
||||
try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
|
||||
try (TcpTransport<?> transport = startTransport(settings, threadPool)) {
|
||||
assertEquals(0, transport.profileBoundAddresses().size());
|
||||
assertEquals(1, transport.boundAddress().boundAddresses().length);
|
||||
} finally {
|
||||
|
@ -133,14 +133,13 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
TcpTransport<?> transport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
transport.start();
|
||||
|
||||
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return nettyTransport;
|
||||
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
return transport;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -63,7 +63,7 @@ DEFAULT_PLUGINS = ["analysis-icu",
|
|||
"analysis-phonetic",
|
||||
"analysis-smartcn",
|
||||
"analysis-stempel",
|
||||
"discovery-azure",
|
||||
"discovery-azure-classic",
|
||||
"discovery-ec2",
|
||||
"discovery-gce",
|
||||
"ingest-attachment",
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
[[discovery-azure]]
|
||||
=== Azure Discovery Plugin
|
||||
[[discovery-azure-classic]]
|
||||
=== Azure Classic Discovery Plugin
|
||||
|
||||
The Azure Discovery plugin uses the Azure API for unicast discovery.
|
||||
The Azure Classic Discovery plugin uses the Azure Classic API for unicast discovery.
|
||||
|
||||
[[discovery-azure-install]]
|
||||
// TODO: Link to ARM plugin when ready
|
||||
// See issue https://github.com/elastic/elasticsearch/issues/19146
|
||||
deprecated[5.0.0, Use coming Azure ARM Discovery plugin instead]
|
||||
|
||||
[[discovery-azure-classic-install]]
|
||||
[float]
|
||||
==== Installation
|
||||
|
||||
|
@ -11,13 +15,13 @@ This plugin can be installed using the plugin manager:
|
|||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/elasticsearch-plugin install discovery-azure
|
||||
sudo bin/elasticsearch-plugin install discovery-azure-classic
|
||||
----------------------------------------------------------------
|
||||
|
||||
The plugin must be installed on every node in the cluster, and each node must
|
||||
be restarted after installation.
|
||||
|
||||
[[discovery-azure-remove]]
|
||||
[[discovery-azure-classic-remove]]
|
||||
[float]
|
||||
==== Removal
|
||||
|
||||
|
@ -25,12 +29,12 @@ The plugin can be removed with the following command:
|
|||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/elasticsearch-plugin remove discovery-azure
|
||||
sudo bin/elasticsearch-plugin remove discovery-azure-classic
|
||||
----------------------------------------------------------------
|
||||
|
||||
The node must be stopped before removing the plugin.
|
||||
|
||||
[[discovery-azure-usage]]
|
||||
[[discovery-azure-classic-usage]]
|
||||
==== Azure Virtual Machine Discovery
|
||||
|
||||
Azure VM discovery allows to use the azure APIs to perform automatic discovery (similar to multicast in non hostile
|
||||
|
@ -64,7 +68,7 @@ You can use {ref}/modules-network.html[core network host settings]. For example
|
|||
|
||||
==============================================
|
||||
|
||||
[[discovery-azure-short]]
|
||||
[[discovery-azure-classic-short]]
|
||||
===== How to start (short story)
|
||||
|
||||
* Create Azure instances
|
||||
|
@ -73,7 +77,7 @@ You can use {ref}/modules-network.html[core network host settings]. For example
|
|||
* Modify `elasticsearch.yml` file
|
||||
* Start Elasticsearch
|
||||
|
||||
[[discovery-azure-settings]]
|
||||
[[discovery-azure-classic-settings]]
|
||||
===== Azure credential API settings
|
||||
|
||||
The following are a list of settings that can further control the credential API:
|
||||
|
@ -100,7 +104,7 @@ The following are a list of settings that can further control the credential API
|
|||
your_azure_cloud_service_name
|
||||
|
||||
|
||||
[[discovery-azure-settings-advanced]]
|
||||
[[discovery-azure-classic-settings-advanced]]
|
||||
===== Advanced settings
|
||||
|
||||
The following are a list of settings that can further control the discovery:
|
||||
|
@ -143,7 +147,7 @@ discovery:
|
|||
slot: production
|
||||
----
|
||||
|
||||
[[discovery-azure-long]]
|
||||
[[discovery-azure-classic-long]]
|
||||
==== Setup process for Azure Discovery
|
||||
|
||||
We will expose here one strategy which is to hide our Elasticsearch cluster from outside.
|
||||
|
@ -153,7 +157,7 @@ other. That means that with this mode, you can use elasticsearch unicast
|
|||
discovery to build a cluster, using the Azure API to retrieve information
|
||||
about your nodes.
|
||||
|
||||
[[discovery-azure-long-prerequisites]]
|
||||
[[discovery-azure-classic-long-prerequisites]]
|
||||
===== Prerequisites
|
||||
|
||||
Before starting, you need to have:
|
||||
|
@ -243,7 +247,7 @@ azure account download
|
|||
azure account import /tmp/azure.publishsettings
|
||||
----
|
||||
|
||||
[[discovery-azure-long-instance]]
|
||||
[[discovery-azure-classic-long-instance]]
|
||||
===== Creating your first instance
|
||||
|
||||
You need to have a storage account available. Check http://www.windowsazure.com/en-us/develop/net/how-to-guides/blob-storage/#create-account[Azure Blob Storage documentation]
|
||||
|
@ -396,7 +400,7 @@ This command should give you a JSON result:
|
|||
}
|
||||
----
|
||||
|
||||
[[discovery-azure-long-plugin]]
|
||||
[[discovery-azure-classic-long-plugin]]
|
||||
===== Install elasticsearch cloud azure plugin
|
||||
|
||||
[source,sh]
|
||||
|
@ -405,7 +409,7 @@ This command should give you a JSON result:
|
|||
sudo service elasticsearch stop
|
||||
|
||||
# Install the plugin
|
||||
sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install discovery-azure
|
||||
sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install discovery-azure-classic
|
||||
|
||||
# Configure it
|
||||
sudo vi /etc/elasticsearch/elasticsearch.yml
|
||||
|
@ -441,7 +445,7 @@ sudo service elasticsearch start
|
|||
|
||||
If anything goes wrong, check your logs in `/var/log/elasticsearch`.
|
||||
|
||||
[[discovery-azure-scale]]
|
||||
[[discovery-azure-classic-scale]]
|
||||
==== Scaling Out!
|
||||
|
||||
You need first to create an image of your previous machine.
|
|
@ -13,9 +13,9 @@ The core discovery plugins are:
|
|||
|
||||
The EC2 discovery plugin uses the https://github.com/aws/aws-sdk-java[AWS API] for unicast discovery.
|
||||
|
||||
<<discovery-azure,Azure discovery>>::
|
||||
<<discovery-azure-classic,Azure Classic discovery>>::
|
||||
|
||||
The Azure discovery plugin uses the Azure API for unicast discovery.
|
||||
The Azure Classic discovery plugin uses the Azure Classic API for unicast discovery.
|
||||
|
||||
<<discovery-gce,GCE discovery>>::
|
||||
|
||||
|
@ -33,7 +33,7 @@ A number of discovery plugins have been contributed by our community:
|
|||
|
||||
include::discovery-ec2.asciidoc[]
|
||||
|
||||
include::discovery-azure.asciidoc[]
|
||||
include::discovery-azure-classic.asciidoc[]
|
||||
|
||||
include::discovery-gce.asciidoc[]
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ The `cloud-aws` plugin has been split into two separate plugins:
|
|||
|
||||
The `cloud-azure` plugin has been split into two separate plugins:
|
||||
|
||||
* <<discovery-azure>> (`discovery-azure`)
|
||||
* <<discovery-azure-classic>> (`discovery-azure-classic`)
|
||||
* <<repository-azure>> (`repository-azure`)
|
||||
|
||||
|
||||
|
|
|
@ -6,14 +6,9 @@ The `plugins` command provides a view per node of running plugins. This informat
|
|||
[source,sh]
|
||||
------------------------------------------------------------------------------
|
||||
% curl 'localhost:9200/_cat/plugins?v'
|
||||
name component version type isolation url
|
||||
Abraxas discovery-azure 2.1.0-SNAPSHOT j x
|
||||
Abraxas lang-javascript 2.0.0-SNAPSHOT j x
|
||||
Abraxas marvel NA j/s x /_plugin/marvel/
|
||||
Abraxas lang-python 2.0.0-SNAPSHOT j x
|
||||
Abraxas inquisitor NA s /_plugin/inquisitor/
|
||||
Abraxas kopf 0.5.2 s /_plugin/kopf/
|
||||
Abraxas segmentspy NA s /_plugin/segmentspy/
|
||||
name component version description
|
||||
Abraxas discovery-gce 5.0.0 The Google Compute Engine (GCE) Discovery plugin allows to use GCE API for the unicast discovery mechanism.
|
||||
Abraxas lang-javascript 5.0.0 The JavaScript language plugin allows to have javascript as the language of scripts to execute.
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
We can tell quickly how many plugins per node we have and which versions.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[[elasticsearch-reference]]
|
||||
= Elasticsearch Reference
|
||||
|
||||
:version: 5.0.0-alpha3
|
||||
:version: 5.0.0-alpha4
|
||||
:major-version: 5.x
|
||||
:branch: master
|
||||
:jdk: 1.8.0_73
|
||||
|
|
|
@ -63,7 +63,7 @@ Proxy settings for both plugins have been renamed:
|
|||
|
||||
Cloud Azure plugin has been split in three plugins:
|
||||
|
||||
* {plugins}/discovery-azure.html[Discovery Azure plugin]
|
||||
* {plugins}/discovery-azure-classic.html[Discovery Azure plugin]
|
||||
* {plugins}/repository-azure.html[Repository Azure plugin]
|
||||
* {plugins}/store-smb.html[Store SMB plugin]
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[[modules-discovery-azure]]
|
||||
=== Azure Discovery
|
||||
[[modules-discovery-azure-classic]]
|
||||
=== Azure Classic Discovery
|
||||
|
||||
Azure discovery allows to use the Azure APIs to perform automatic discovery (similar to multicast).
|
||||
It is available as a plugin. See {plugins}/discovery-azure.html[discovery-azure] for more information.
|
||||
Azure classic discovery allows to use the Azure Classic APIs to perform automatic discovery (similar to multicast).
|
||||
It is available as a plugin. See {plugins}/discovery-azure-classic.html[discovery-azure-classic] for more information.
|
||||
|
|
|
@ -83,9 +83,16 @@ First, each document is scored by the defined functions. The parameter
|
|||
`max`:: maximum score is used
|
||||
`min`:: minimum score is used
|
||||
|
||||
Because scores can be on different scales (for example, between 0 and 1 for decay functions but arbitrary for `field_value_factor`) and also because sometimes a different impact of functions on the score is desirable, the score of each function can be adjusted with a user defined `weight` (). The `weight` can be defined per function in the `functions` array (example above) and is multiplied with the score computed by the respective function.
|
||||
Because scores can be on different scales (for example, between 0 and 1 for decay functions but arbitrary for `field_value_factor`) and also
|
||||
because sometimes a different impact of functions on the score is desirable, the score of each function can be adjusted with a user defined
|
||||
`weight`. The `weight` can be defined per function in the `functions` array (example above) and is multiplied with the score computed by
|
||||
the respective function.
|
||||
If weight is given without any other function declaration, `weight` acts as a function that simply returns the `weight`.
|
||||
|
||||
In case `score_mode` is set to `avg` the individual scores will be combined by a **weighted** average.
|
||||
For example, if two functions return score 1 and 2 and their respective weights are 3 and 4, then their scores will be combined as
|
||||
`(1*3+2*4)/(3+4)` and **not** `(1*3+2*4)/2`.
|
||||
|
||||
The new score can be restricted to not exceed a certain limit by setting
|
||||
the `max_boost` parameter. The default for `max_boost` is FLT_MAX.
|
||||
|
||||
|
|
|
@ -699,7 +699,9 @@ The meaning of the stats are as follows:
|
|||
|
||||
This is not currently used and will always report `0`. Currently aggregation profiling only times the shard level parts of the aggregation execution. Timing of the reduce phase will be added later.
|
||||
|
||||
=== Performance Notes
|
||||
=== Profiling Considerations
|
||||
|
||||
==== Performance Notes
|
||||
|
||||
Like any profiler, the Profile API introduces a non-negligible overhead to search execution. The act of instrumenting
|
||||
low-level method calls such as `collect`, `advance` and `next_doc` can be fairly expensive, since these methods are called
|
||||
|
@ -710,7 +712,7 @@ There are also cases where special Lucene optimizations are disabled, since they
|
|||
could cause some queries to report larger relative times than their non-profiled counterparts, but in general should
|
||||
not have a drastic effect compared to other components in the profiled query.
|
||||
|
||||
=== Limitations
|
||||
==== Limitations
|
||||
|
||||
- Profiling statistics are currently not available for suggestions, highlighting, `dfs_query_then_fetch`
|
||||
- Profiling of the reduce phase of aggregation is currently not available
|
||||
|
|
|
@ -21,8 +21,6 @@ package org.elasticsearch.percolator;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.mapper.Mapper;
|
||||
|
@ -39,8 +37,6 @@ import java.util.Map;
|
|||
|
||||
public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlugin {
|
||||
|
||||
public static final String NAME = "percolator";
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
public PercolatorPlugin(Settings settings) {
|
||||
|
@ -65,7 +61,7 @@ public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlug
|
|||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
|
||||
return Collections.singletonList(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,7 +69,4 @@ public class PercolatorPlugin extends Plugin implements MapperPlugin, ActionPlug
|
|||
return Collections.singletonMap(PercolatorFieldMapper.CONTENT_TYPE, new PercolatorFieldMapper.TypeParser());
|
||||
}
|
||||
|
||||
static boolean transportClientMode(Settings settings) {
|
||||
return TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,13 +36,10 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
|
|||
public class RestMultiPercolateAction extends BaseRestHandler {
|
||||
|
||||
private final boolean allowExplicitIndex;
|
||||
private final TransportMultiPercolateAction action;
|
||||
|
||||
@Inject
|
||||
public RestMultiPercolateAction(Settings settings, RestController controller, Client client,
|
||||
TransportMultiPercolateAction action) {
|
||||
public RestMultiPercolateAction(Settings settings, RestController controller, Client client) {
|
||||
super(settings, client);
|
||||
this.action = action;
|
||||
controller.registerHandler(POST, "/_mpercolate", this);
|
||||
controller.registerHandler(POST, "/{index}/_mpercolate", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_mpercolate", this);
|
||||
|
@ -61,7 +58,8 @@ public class RestMultiPercolateAction extends BaseRestHandler {
|
|||
multiPercolateRequest.indices(Strings.splitStringByCommaToArray(restRequest.param("index")));
|
||||
multiPercolateRequest.documentType(restRequest.param("type"));
|
||||
multiPercolateRequest.add(RestActions.getRestContent(restRequest), allowExplicitIndex);
|
||||
action.execute(multiPercolateRequest, new RestToXContentListener<MultiPercolateResponse>(restChannel));
|
||||
client.execute(MultiPercolateAction.INSTANCE, multiPercolateRequest,
|
||||
new RestToXContentListener<MultiPercolateResponse>(restChannel));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,13 +36,9 @@ import static org.elasticsearch.rest.RestRequest.Method.GET;
|
|||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||
|
||||
public class RestPercolateAction extends BaseRestHandler {
|
||||
|
||||
private final TransportPercolateAction action;
|
||||
|
||||
@Inject
|
||||
public RestPercolateAction(Settings settings, RestController controller, Client client, TransportPercolateAction action) {
|
||||
public RestPercolateAction(Settings settings, RestController controller, Client client) {
|
||||
super(settings, client);
|
||||
this.action = action;
|
||||
controller.registerHandler(GET, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate", this);
|
||||
|
||||
|
@ -54,7 +50,8 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
controller.registerHandler(GET, "/{index}/{type}/_percolate/count", countHandler);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate/count", countHandler);
|
||||
|
||||
RestCountPercolateExistingDocHandler countExistingDocHandler = new RestCountPercolateExistingDocHandler(settings, controller, client);
|
||||
RestCountPercolateExistingDocHandler countExistingDocHandler = new RestCountPercolateExistingDocHandler(settings, controller,
|
||||
client);
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate/count", countExistingDocHandler);
|
||||
controller.registerHandler(POST, "/{index}/{type}/{id}/_percolate/count", countExistingDocHandler);
|
||||
}
|
||||
|
@ -67,10 +64,11 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
percolateRequest.source(RestActions.getRestContent(restRequest));
|
||||
|
||||
percolateRequest.indicesOptions(IndicesOptions.fromRequest(restRequest, percolateRequest.indicesOptions()));
|
||||
executePercolate(percolateRequest, restChannel);
|
||||
executePercolate(client, percolateRequest, restChannel);
|
||||
}
|
||||
|
||||
void parseExistingDocPercolate(PercolateRequest percolateRequest, RestRequest restRequest, RestChannel restChannel, final Client client) {
|
||||
void parseExistingDocPercolate(PercolateRequest percolateRequest, RestRequest restRequest, RestChannel restChannel,
|
||||
final Client client) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
percolateRequest.indices(Strings.splitStringByCommaToArray(restRequest.param("percolate_index", index)));
|
||||
|
@ -91,11 +89,11 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
percolateRequest.source(RestActions.getRestContent(restRequest));
|
||||
|
||||
percolateRequest.indicesOptions(IndicesOptions.fromRequest(restRequest, percolateRequest.indicesOptions()));
|
||||
executePercolate(percolateRequest, restChannel);
|
||||
executePercolate(client, percolateRequest, restChannel);
|
||||
}
|
||||
|
||||
void executePercolate(final PercolateRequest percolateRequest, final RestChannel restChannel) {
|
||||
action.execute(percolateRequest, new RestToXContentListener<>(restChannel));
|
||||
void executePercolate(final Client client, final PercolateRequest percolateRequest, final RestChannel restChannel) {
|
||||
client.execute(PercolateAction.INSTANCE, percolateRequest, new RestToXContentListener<>(restChannel));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,14 +36,11 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
|
|||
import static org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction.nodeSettingListener;
|
||||
|
||||
public class RestRethrottleAction extends BaseRestHandler {
|
||||
private final TransportRethrottleAction action;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject
|
||||
public RestRethrottleAction(Settings settings, RestController controller, Client client, TransportRethrottleAction action,
|
||||
ClusterService clusterService) {
|
||||
public RestRethrottleAction(Settings settings, RestController controller, Client client, ClusterService clusterService) {
|
||||
super(settings, client);
|
||||
this.action = action;
|
||||
this.clusterService = clusterService;
|
||||
controller.registerHandler(POST, "/_update_by_query/{taskId}/_rethrottle", this);
|
||||
controller.registerHandler(POST, "/_delete_by_query/{taskId}/_rethrottle", this);
|
||||
|
@ -60,6 +57,6 @@ public class RestRethrottleAction extends BaseRestHandler {
|
|||
}
|
||||
internalRequest.setRequestsPerSecond(requestsPerSecond);
|
||||
ActionListener<ListTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
|
||||
action.execute(internalRequest, listener);
|
||||
client.execute(RethrottleAction.INSTANCE, internalRequest, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ import org.elasticsearch.gradle.LoggedExec
|
|||
*/
|
||||
|
||||
esplugin {
|
||||
description 'The Azure Discovery plugin allows to use Azure API for the unicast discovery mechanism'
|
||||
classname 'org.elasticsearch.plugin.discovery.azure.AzureDiscoveryPlugin'
|
||||
description 'The Azure Classic Discovery plugin allows to use Azure Classic API for the unicast discovery mechanism'
|
||||
classname 'org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin'
|
||||
}
|
||||
|
||||
versions << [
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue