Node Stats: Add number of server open channels for transport and http, closes #1115.
This commit is contained in:
parent
c04be9d365
commit
fdbcec8a84
|
@ -21,11 +21,13 @@ package org.elasticsearch.action.admin.cluster.node.info;
|
|||
|
||||
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.http.HttpInfo;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.monitor.network.NetworkInfo;
|
||||
import org.elasticsearch.monitor.os.OsInfo;
|
||||
|
@ -56,12 +58,14 @@ public class NodeInfo extends NodeOperationResponse {
|
|||
|
||||
private TransportInfo transport;
|
||||
|
||||
private HttpInfo http;
|
||||
|
||||
NodeInfo() {
|
||||
}
|
||||
|
||||
public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> attributes, Settings settings,
|
||||
OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network,
|
||||
TransportInfo transport) {
|
||||
TransportInfo transport, @Nullable HttpInfo http) {
|
||||
super(node);
|
||||
this.attributes = attributes;
|
||||
this.settings = settings;
|
||||
|
@ -164,6 +168,14 @@ public class NodeInfo extends NodeOperationResponse {
|
|||
return transport();
|
||||
}
|
||||
|
||||
public HttpInfo http() {
|
||||
return http;
|
||||
}
|
||||
|
||||
public HttpInfo getHttp() {
|
||||
return http();
|
||||
}
|
||||
|
||||
public static NodeInfo readNodeInfo(StreamInput in) throws IOException {
|
||||
NodeInfo nodeInfo = new NodeInfo();
|
||||
nodeInfo.readFrom(in);
|
||||
|
@ -194,6 +206,9 @@ public class NodeInfo extends NodeOperationResponse {
|
|||
if (in.readBoolean()) {
|
||||
transport = TransportInfo.readTransportInfo(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
http = HttpInfo.readHttpInfo(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -234,5 +249,11 @@ public class NodeInfo extends NodeOperationResponse {
|
|||
out.writeBoolean(true);
|
||||
transport.writeTo(out);
|
||||
}
|
||||
if (http == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
http.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,10 +25,12 @@ import org.elasticsearch.action.support.nodes.NodeOperationRequest;
|
|||
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.http.HttpServer;
|
||||
import org.elasticsearch.monitor.MonitorService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -46,6 +48,8 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
|
|||
|
||||
private volatile ImmutableMap<String, String> nodeAttributes = ImmutableMap.of();
|
||||
|
||||
@Nullable private HttpServer httpServer;
|
||||
|
||||
@Inject public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
MonitorService monitorService) {
|
||||
|
@ -53,6 +57,10 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
|
|||
this.monitorService = monitorService;
|
||||
}
|
||||
|
||||
public void setHttpServer(@Nullable HttpServer httpServer) {
|
||||
this.httpServer = httpServer;
|
||||
}
|
||||
|
||||
public synchronized void putNodeAttribute(String key, String value) {
|
||||
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).put(key, value).immutableMap();
|
||||
}
|
||||
|
@ -104,7 +112,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
|
|||
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings,
|
||||
monitorService.osService().info(), monitorService.processService().info(),
|
||||
monitorService.jvmService().info(), monitorService.networkService().info(),
|
||||
transportService.info());
|
||||
transportService.info(), httpServer == null ? null : httpServer.info());
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
|
|
|
@ -21,13 +21,16 @@ package org.elasticsearch.action.admin.cluster.node.stats;
|
|||
|
||||
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.http.HttpStats;
|
||||
import org.elasticsearch.indices.NodeIndicesStats;
|
||||
import org.elasticsearch.monitor.jvm.JvmStats;
|
||||
import org.elasticsearch.monitor.network.NetworkStats;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
import org.elasticsearch.monitor.process.ProcessStats;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -48,17 +51,24 @@ public class NodeStats extends NodeOperationResponse {
|
|||
|
||||
private NetworkStats network;
|
||||
|
||||
private TransportStats transport;
|
||||
|
||||
private HttpStats http;
|
||||
|
||||
NodeStats() {
|
||||
}
|
||||
|
||||
public NodeStats(DiscoveryNode node, NodeIndicesStats indices,
|
||||
OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network) {
|
||||
OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network,
|
||||
TransportStats transport, @Nullable HttpStats http) {
|
||||
super(node);
|
||||
this.indices = indices;
|
||||
this.os = os;
|
||||
this.process = process;
|
||||
this.jvm = jvm;
|
||||
this.network = network;
|
||||
this.transport = transport;
|
||||
this.http = http;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,6 +141,22 @@ public class NodeStats extends NodeOperationResponse {
|
|||
return network();
|
||||
}
|
||||
|
||||
public TransportStats transport() {
|
||||
return this.transport;
|
||||
}
|
||||
|
||||
public TransportStats getTransport() {
|
||||
return transport();
|
||||
}
|
||||
|
||||
public HttpStats http() {
|
||||
return this.http;
|
||||
}
|
||||
|
||||
public HttpStats getHttp() {
|
||||
return http();
|
||||
}
|
||||
|
||||
public static NodeStats readNodeStats(StreamInput in) throws IOException {
|
||||
NodeStats nodeInfo = new NodeStats();
|
||||
nodeInfo.readFrom(in);
|
||||
|
@ -154,6 +180,12 @@ public class NodeStats extends NodeOperationResponse {
|
|||
if (in.readBoolean()) {
|
||||
network = NetworkStats.readNetworkStats(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
transport = TransportStats.readTransportStats(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
http = HttpStats.readHttpStats(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -188,5 +220,17 @@ public class NodeStats extends NodeOperationResponse {
|
|||
out.writeBoolean(true);
|
||||
network.writeTo(out);
|
||||
}
|
||||
if (transport == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
transport.writeTo(out);
|
||||
}
|
||||
if (http == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
http.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,9 +25,11 @@ import org.elasticsearch.action.support.nodes.NodeOperationRequest;
|
|||
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.http.HttpServer;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.monitor.MonitorService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -45,6 +47,8 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Nullable private HttpServer httpServer;
|
||||
|
||||
@Inject public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
MonitorService monitorService, IndicesService indicesService) {
|
||||
|
@ -53,6 +57,10 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
|
|||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
public void setHttpServer(@Nullable HttpServer httpServer) {
|
||||
this.httpServer = httpServer;
|
||||
}
|
||||
|
||||
@Override protected String executor() {
|
||||
return ThreadPool.Names.CACHED;
|
||||
}
|
||||
|
@ -95,7 +103,8 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
|
|||
@Override protected NodeStats nodeOperation(NodeStatsRequest request) throws ElasticSearchException {
|
||||
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(),
|
||||
monitorService.osService().stats(), monitorService.processService().stats(),
|
||||
monitorService.jvmService().stats(), monitorService.networkService().stats());
|
||||
monitorService.jvmService().stats(), monitorService.networkService().stats(),
|
||||
transportService.stats(), httpServer == null ? null : httpServer.stats());
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
|
|
|
@ -19,10 +19,19 @@
|
|||
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.elasticsearch.common.netty.channel.*;
|
||||
import org.elasticsearch.common.netty.channel.Channel;
|
||||
import org.elasticsearch.common.netty.channel.ChannelEvent;
|
||||
import org.elasticsearch.common.netty.channel.ChannelFuture;
|
||||
import org.elasticsearch.common.netty.channel.ChannelFutureListener;
|
||||
import org.elasticsearch.common.netty.channel.ChannelHandler;
|
||||
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
|
||||
import org.elasticsearch.common.netty.channel.ChannelState;
|
||||
import org.elasticsearch.common.netty.channel.ChannelStateEvent;
|
||||
import org.elasticsearch.common.netty.channel.ChannelUpstreamHandler;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -31,10 +40,14 @@ import java.util.Set;
|
|||
public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
||||
|
||||
private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
|
||||
private AtomicLong openChannelsCount = new AtomicLong();
|
||||
|
||||
private final ChannelFutureListener remover = new ChannelFutureListener() {
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
openChannels.remove(future.getChannel());
|
||||
boolean removed = openChannels.remove(future.getChannel());
|
||||
if (removed) {
|
||||
openChannelsCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -44,6 +57,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
|||
if (evt.getState() == ChannelState.OPEN) {
|
||||
boolean added = openChannels.add(ctx.getChannel());
|
||||
if (added) {
|
||||
openChannelsCount.incrementAndGet();
|
||||
ctx.getChannel().getCloseFuture().addListener(remover);
|
||||
}
|
||||
}
|
||||
|
@ -51,6 +65,10 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
|
|||
ctx.sendUpstream(e);
|
||||
}
|
||||
|
||||
public long numberOfOpenChannels() {
|
||||
return openChannelsCount.get();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
for (Channel channel : openChannels) {
|
||||
channel.close().awaitUninterruptibly();
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class HttpInfo implements Streamable, Serializable, ToXContent {
|
||||
|
||||
private BoundTransportAddress address;
|
||||
|
||||
HttpInfo() {
|
||||
}
|
||||
|
||||
public HttpInfo(BoundTransportAddress address) {
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("http");
|
||||
builder.field("bound_address", address.boundAddress().toString());
|
||||
builder.field("publish_address", address.publishAddress().toString());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static HttpInfo readHttpInfo(StreamInput in) throws IOException {
|
||||
HttpInfo info = new HttpInfo();
|
||||
info.readFrom(in);
|
||||
return info;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
address = BoundTransportAddress.readBoundTransportAddress(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
address.writeTo(out);
|
||||
}
|
||||
|
||||
public BoundTransportAddress address() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public BoundTransportAddress getAddress() {
|
||||
return address();
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.http;
|
|||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -55,12 +56,16 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
|
|||
private final boolean disableSites;
|
||||
|
||||
@Inject public HttpServer(Settings settings, Environment environment, HttpServerTransport transport,
|
||||
RestController restController, TransportNodesInfoAction nodesInfoAction) {
|
||||
RestController restController,
|
||||
TransportNodesInfoAction nodesInfoAction, TransportNodesStatsAction nodesStatsAction) {
|
||||
super(settings);
|
||||
this.environment = environment;
|
||||
this.transport = transport;
|
||||
this.restController = restController;
|
||||
this.nodesInfoAction = nodesInfoAction;
|
||||
this.nodesInfoAction.setHttpServer(this);
|
||||
|
||||
nodesStatsAction.setHttpServer(this);
|
||||
|
||||
this.disableSites = componentSettings.getAsBoolean("disable_sites", false);
|
||||
|
||||
|
@ -97,6 +102,14 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
|
|||
transport.close();
|
||||
}
|
||||
|
||||
public HttpInfo info() {
|
||||
return new HttpInfo(transport.boundAddress());
|
||||
}
|
||||
|
||||
public HttpStats stats() {
|
||||
return transport.stats();
|
||||
}
|
||||
|
||||
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
|
||||
if (request.rawPath().startsWith("/_plugin/")) {
|
||||
handlePluginSite(request, channel);
|
||||
|
|
|
@ -29,5 +29,7 @@ public interface HttpServerTransport extends LifecycleComponent<HttpServerTransp
|
|||
|
||||
BoundTransportAddress boundAddress();
|
||||
|
||||
HttpStats stats();
|
||||
|
||||
void httpServerAdapter(HttpServerAdapter httpServerAdapter);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class HttpStats implements Streamable, ToXContent {
|
||||
|
||||
private long serverOpen;
|
||||
|
||||
HttpStats() {
|
||||
|
||||
}
|
||||
|
||||
public HttpStats(long serverOpen) {
|
||||
this.serverOpen = serverOpen;
|
||||
}
|
||||
|
||||
public long serverOpen() {
|
||||
return this.serverOpen;
|
||||
}
|
||||
|
||||
public long getServerOpen() {
|
||||
return serverOpen();
|
||||
}
|
||||
|
||||
public static HttpStats readHttpStats(StreamInput in) throws IOException {
|
||||
HttpStats stats = new HttpStats();
|
||||
stats.readFrom(in);
|
||||
return stats;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
serverOpen = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(serverOpen);
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("http");
|
||||
builder.field("server_open", serverOpen);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -24,10 +24,19 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.netty.bootstrap.ServerBootstrap;
|
||||
import org.elasticsearch.common.netty.channel.*;
|
||||
import org.elasticsearch.common.netty.channel.Channel;
|
||||
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
|
||||
import org.elasticsearch.common.netty.channel.ChannelPipeline;
|
||||
import org.elasticsearch.common.netty.channel.ChannelPipelineFactory;
|
||||
import org.elasticsearch.common.netty.channel.Channels;
|
||||
import org.elasticsearch.common.netty.channel.ExceptionEvent;
|
||||
import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
import org.elasticsearch.common.netty.channel.socket.oio.OioServerSocketChannelFactory;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.*;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpChunkAggregator;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpContentCompressor;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpContentDecompressor;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpRequestDecoder;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import org.elasticsearch.common.netty.handler.timeout.ReadTimeoutException;
|
||||
import org.elasticsearch.common.netty.logging.InternalLogger;
|
||||
import org.elasticsearch.common.netty.logging.InternalLoggerFactory;
|
||||
|
@ -40,8 +49,12 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper;
|
|||
import org.elasticsearch.common.transport.PortsRange;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.BindHttpException;
|
||||
import org.elasticsearch.http.HttpChannel;
|
||||
import org.elasticsearch.http.HttpRequest;
|
||||
import org.elasticsearch.http.HttpServerAdapter;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.HttpStats;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
|
||||
|
||||
|
@ -249,6 +262,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
return this.boundAddress;
|
||||
}
|
||||
|
||||
@Override public HttpStats stats() {
|
||||
return new HttpStats(serverOpenChannels.numberOfOpenChannels());
|
||||
}
|
||||
|
||||
void dispatchRequest(HttpRequest request, HttpChannel channel) {
|
||||
httpServerAdapter.dispatchRequest(request, channel);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,13 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.XContentRestResponse;
|
||||
import org.elasticsearch.rest.XContentThrowableRestResponse;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
|
@ -104,6 +110,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
|
|||
if (nodeInfo.transport() != null) {
|
||||
nodeInfo.transport().toXContent(builder, request);
|
||||
}
|
||||
if (nodeInfo.http() != null) {
|
||||
nodeInfo.http().toXContent(builder, request);
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
|
|
|
@ -83,6 +83,12 @@ public class RestNodesStatsAction extends BaseRestHandler {
|
|||
if (nodeStats.network() != null) {
|
||||
nodeStats.network().toXContent(builder, request);
|
||||
}
|
||||
if (nodeStats.transport() != null) {
|
||||
nodeStats.transport().toXContent(builder, request);
|
||||
}
|
||||
if (nodeStats.http() != null) {
|
||||
nodeStats.http().toXContent(builder, request);
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
|
|
|
@ -74,4 +74,6 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
* Sends the request to the node.
|
||||
*/
|
||||
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException;
|
||||
|
||||
TransportStats stats();
|
||||
}
|
||||
|
|
|
@ -105,6 +105,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
return new TransportInfo(boundAddress());
|
||||
}
|
||||
|
||||
public TransportStats stats() {
|
||||
return transport.stats();
|
||||
}
|
||||
|
||||
public BoundTransportAddress boundAddress() {
|
||||
return transport.boundAddress();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TransportStats implements Streamable, ToXContent {
|
||||
|
||||
private long serverOpen;
|
||||
|
||||
TransportStats() {
|
||||
|
||||
}
|
||||
|
||||
public TransportStats(long serverOpen) {
|
||||
this.serverOpen = serverOpen;
|
||||
}
|
||||
|
||||
public long serverOpen() {
|
||||
return this.serverOpen;
|
||||
}
|
||||
|
||||
public long getServerOpen() {
|
||||
return serverOpen();
|
||||
}
|
||||
|
||||
public static TransportStats readTransportStats(StreamInput in) throws IOException {
|
||||
TransportStats stats = new TransportStats();
|
||||
stats.readFrom(in);
|
||||
return stats;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
serverOpen = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(serverOpen);
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("transport");
|
||||
builder.field("server_open", serverOpen);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -144,6 +144,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
}
|
||||
|
||||
@Override public TransportStats stats() {
|
||||
return new TransportStats(0);
|
||||
}
|
||||
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.elasticsearch.transport.Transport;
|
|||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.TransportStats;
|
||||
import org.elasticsearch.transport.support.TransportStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -438,6 +439,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
return new InetSocketTransportAddress((InetSocketAddress) socketAddress);
|
||||
}
|
||||
|
||||
@Override public TransportStats stats() {
|
||||
return new TransportStats(serverOpenChannels.numberOfOpenChannels());
|
||||
}
|
||||
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
|
||||
Channel targetChannel = nodeChannel(node, options);
|
||||
|
||||
|
|
|
@ -218,7 +218,7 @@ public class FullRestartStressTest {
|
|||
int numberOfNodes = 2;
|
||||
Settings settings = ImmutableSettings.settingsBuilder()
|
||||
.put("index.shard.check_index", true)
|
||||
.put("gateway.type", "fs")
|
||||
.put("gateway.type", "local")
|
||||
.put("gateway.recover_after_nodes", numberOfNodes)
|
||||
.put("index.number_of_shards", 1)
|
||||
.build();
|
||||
|
|
Loading…
Reference in New Issue