diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index 2f311fab919..493359b5cb4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -21,11 +21,13 @@ package org.elasticsearch.action.admin.cluster.node.info; import org.elasticsearch.action.support.nodes.NodeOperationResponse; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.http.HttpInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.network.NetworkInfo; import org.elasticsearch.monitor.os.OsInfo; @@ -56,12 +58,14 @@ public class NodeInfo extends NodeOperationResponse { private TransportInfo transport; + private HttpInfo http; + NodeInfo() { } public NodeInfo(DiscoveryNode node, ImmutableMap attributes, Settings settings, OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, - TransportInfo transport) { + TransportInfo transport, @Nullable HttpInfo http) { super(node); this.attributes = attributes; this.settings = settings; @@ -164,6 +168,14 @@ public class NodeInfo extends NodeOperationResponse { return transport(); } + public HttpInfo http() { + return http; + } + + public HttpInfo getHttp() { + return http(); + } + public static NodeInfo readNodeInfo(StreamInput in) throws IOException { NodeInfo nodeInfo = new NodeInfo(); nodeInfo.readFrom(in); @@ -194,6 +206,9 @@ public class NodeInfo extends NodeOperationResponse { if (in.readBoolean()) { transport = TransportInfo.readTransportInfo(in); } + if (in.readBoolean()) { + http = HttpInfo.readHttpInfo(in); + } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -234,5 +249,11 @@ public class NodeInfo extends NodeOperationResponse { out.writeBoolean(true); transport.writeTo(out); } + if (http == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + http.writeTo(out); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index 3dad2b17898..e67b5eba2f8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -25,10 +25,12 @@ import org.elasticsearch.action.support.nodes.NodeOperationRequest; import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.http.HttpServer; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -46,6 +48,8 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction nodeAttributes = ImmutableMap.of(); + @Nullable private HttpServer httpServer; + @Inject public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, MonitorService monitorService) { @@ -53,6 +57,10 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction().putAll(nodeAttributes).put(key, value).immutableMap(); } @@ -104,7 +112,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction openChannels = ConcurrentCollections.newConcurrentSet(); + private AtomicLong openChannelsCount = new AtomicLong(); private final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { - openChannels.remove(future.getChannel()); + boolean removed = openChannels.remove(future.getChannel()); + if (removed) { + openChannelsCount.decrementAndGet(); + } } }; @@ -44,6 +57,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler { if (evt.getState() == ChannelState.OPEN) { boolean added = openChannels.add(ctx.getChannel()); if (added) { + openChannelsCount.incrementAndGet(); ctx.getChannel().getCloseFuture().addListener(remover); } } @@ -51,6 +65,10 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler { ctx.sendUpstream(e); } + public long numberOfOpenChannels() { + return openChannelsCount.get(); + } + public void close() { for (Channel channel : openChannels) { channel.close().awaitUninterruptibly(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpInfo.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpInfo.java new file mode 100644 index 00000000000..3711b1f5d5e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpInfo.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.Serializable; + +/** + * @author kimchy (shay.banon) + */ +public class HttpInfo implements Streamable, Serializable, ToXContent { + + private BoundTransportAddress address; + + HttpInfo() { + } + + public HttpInfo(BoundTransportAddress address) { + this.address = address; + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("http"); + builder.field("bound_address", address.boundAddress().toString()); + builder.field("publish_address", address.publishAddress().toString()); + builder.endObject(); + return builder; + } + + public static HttpInfo readHttpInfo(StreamInput in) throws IOException { + HttpInfo info = new HttpInfo(); + info.readFrom(in); + return info; + } + + @Override public void readFrom(StreamInput in) throws IOException { + address = BoundTransportAddress.readBoundTransportAddress(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + address.writeTo(out); + } + + public BoundTransportAddress address() { + return address; + } + + public BoundTransportAddress getAddress() { + return address(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java index e0defa62bc4..cc6d32fd79b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java @@ -21,6 +21,7 @@ package org.elasticsearch.http; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -55,12 +56,16 @@ public class HttpServer extends AbstractLifecycleComponent { private final boolean disableSites; @Inject public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, - RestController restController, TransportNodesInfoAction nodesInfoAction) { + RestController restController, + TransportNodesInfoAction nodesInfoAction, TransportNodesStatsAction nodesStatsAction) { super(settings); this.environment = environment; this.transport = transport; this.restController = restController; this.nodesInfoAction = nodesInfoAction; + this.nodesInfoAction.setHttpServer(this); + + nodesStatsAction.setHttpServer(this); this.disableSites = componentSettings.getAsBoolean("disable_sites", false); @@ -97,6 +102,14 @@ public class HttpServer extends AbstractLifecycleComponent { transport.close(); } + public HttpInfo info() { + return new HttpInfo(transport.boundAddress()); + } + + public HttpStats stats() { + return transport.stats(); + } + public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) { if (request.rawPath().startsWith("/_plugin/")) { handlePluginSite(request, channel); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServerTransport.java index 3a81fc6b2d2..c57f5a43af4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServerTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServerTransport.java @@ -29,5 +29,7 @@ public interface HttpServerTransport extends LifecycleComponent { * Sends the request to the node. */ void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException; + + TransportStats stats(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index dfcac100c12..74004dce33d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -105,6 +105,10 @@ public class TransportService extends AbstractLifecycleComponent implem } } + @Override public TransportStats stats() { + return new TransportStats(0); + } + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index cdb1abd7e21..e97c8798827 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -65,6 +65,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import org.elasticsearch.transport.support.TransportStreams; import java.io.IOException; @@ -438,6 +439,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem return new InetSocketTransportAddress((InetSocketAddress) socketAddress); } + @Override public TransportStats stats() { + return new TransportStats(serverOpenChannels.numberOfOpenChannels()); + } + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { Channel targetChannel = nodeChannel(node, options); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java index 7e6bb25b1fa..ac8e60afada 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java @@ -218,7 +218,7 @@ public class FullRestartStressTest { int numberOfNodes = 2; Settings settings = ImmutableSettings.settingsBuilder() .put("index.shard.check_index", true) - .put("gateway.type", "fs") + .put("gateway.type", "local") .put("gateway.recover_after_nodes", numberOfNodes) .put("index.number_of_shards", 1) .build();